提交 8913283c 编写于 作者: R Rossen Stoyanchev

Add AnnotationStompService

上级 b194d4d6
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation indicating a method parameter should be bound to the body of a message.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MessageBody {
/**
* Whether body content is required.
* <p>Default is {@code true}, leading to an exception thrown in case
* there is no body content. Switch this to {@code false} if you prefer
* {@code null} to be passed when the body content is {@code null}.
*/
boolean required() default true;
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging;
import java.io.IOException;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MessageBroker {
void send(String destination, Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException;
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MessageMapping {
/**
* The primary mapping expressed by this annotation.
* <p>The destination for a message (e.g. "/topic/echo").
*/
String[] value() default {};
/**
* TODO
*/
MessageType messageType() default MessageType.NONE;
}
......@@ -23,6 +23,6 @@ package org.springframework.web.messaging;
*/
public enum MessageType {
CONNECT, SUBSCRIBE, UNSUBSCRIBE, SEND, NONE
CONNECT, SUBSCRIBE, UNSUBSCRIBE, SEND, OTHER, NONE
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging;
import java.io.IOException;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface Subscription {
void reply(Object content) throws IOException, ContentTypeNotSupportedException;
void reply(Object content, MediaType mediaType) throws IOException, ContentTypeNotSupportedException;
}
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
/**
* Abstract base class for most {@link MessageConverter} implementations.
*
* <p>This base class adds support for setting supported {@code MediaTypes}, through the
* {@link #setSupportedMediaTypes(List) supportedMediaTypes} property.
*
* @author Rossen Stoyanchev
* @author Arjen Poutsma
* @since 4.0
*/
public abstract class AbstractMessageConverter<T> implements MessageConverter<T> {
/** Logger available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private List<MediaType> supportedMediaTypes = Collections.emptyList();
/**
* Construct an {@code AbstractMessageConverter} with no supported media types.
* @see #setSupportedMediaTypes
*/
protected AbstractMessageConverter() {
}
/**
* Construct an {@code AbstractMessageConverter} with one supported media type.
*
* @param supportedMediaType the supported media type
*/
protected AbstractMessageConverter(MediaType supportedMediaType) {
setSupportedMediaTypes(Collections.singletonList(supportedMediaType));
}
/**
* Construct an {@code AbstractMessageConverter} with multiple supported media type.
*
* @param supportedMediaTypes the supported media types
*/
protected AbstractMessageConverter(MediaType... supportedMediaTypes) {
setSupportedMediaTypes(Arrays.asList(supportedMediaTypes));
}
/**
* Set the list of {@link MediaType} objects supported by this converter.
*/
public void setSupportedMediaTypes(List<MediaType> supportedMediaTypes) {
Assert.notEmpty(supportedMediaTypes, "'supportedMediaTypes' must not be empty");
this.supportedMediaTypes = new ArrayList<MediaType>(supportedMediaTypes);
}
@Override
public List<MediaType> getSupportedMediaTypes() {
return Collections.unmodifiableList(this.supportedMediaTypes);
}
/**
* This implementation checks if the given class is {@linkplain #supports(Class)
* supported}, and if the {@linkplain #getSupportedMediaTypes() supported media types}
* {@linkplain MediaType#includes(MediaType) include} the given media type.
*/
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType) {
return supports(clazz) && canConvertFrom(mediaType);
}
/**
* Indicates whether the given class is supported by this converter.
* @param clazz the class to test for support
* @return {@code true} if supported; {@code false} otherwise
*/
protected abstract boolean supports(Class<?> clazz);
/**
* Returns true if any of the {@linkplain #setSupportedMediaTypes(List) supported
* media types} include the given media type.
*
* @param mediaType the media type to read, can be {@code null} if not specified.
* Typically the value of a {@code Content-Type} header.
* @return {@code true} if the supported media types include the media type, or if the
* media type is {@code null}
*/
protected boolean canConvertFrom(MediaType mediaType) {
if (mediaType == null) {
return true;
}
for (MediaType supportedMediaType : getSupportedMediaTypes()) {
if (supportedMediaType.includes(mediaType)) {
return true;
}
}
return false;
}
/**
* This implementation checks if the given class is {@linkplain #supports(Class)
* supported}, and if the {@linkplain #getSupportedMediaTypes() supported media types}
* {@linkplain MediaType#includes(MediaType) include} the given media type.
*/
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
return supports(clazz) && canConvertTo(mediaType);
}
/**
* Returns {@code true} if the given media type includes any of the
* {@linkplain #setSupportedMediaTypes(List) supported media types}.
*
* @param mediaType the media type to write, can be {@code null} if not specified.
* Typically the value of an {@code Accept} header.
* @return {@code true} if the supported media types are compatible with the media
* type, or if the media type is {@code null}
*/
protected boolean canConvertTo(MediaType mediaType) {
if (mediaType == null || MediaType.ALL.equals(mediaType)) {
return true;
}
for (MediaType supportedMediaType : getSupportedMediaTypes()) {
if (supportedMediaType.isCompatibleWith(mediaType)) {
return true;
}
}
return false;
}
/**
* This implementation simply delegates to
* {@link #convertFromPayloadInternal(Class, MediaType, byte[])}. Future
* implementations might add some default behavior, however.
*/
@Override
public T convertFromPayload(Class<? extends T> clazz, MediaType contentType, byte[] payload)
throws IOException {
return convertFromPayloadInternal(clazz, contentType, payload);
}
/**
* Abstract template method that reads the actual object. Invoked from {@link #read}.
* @param clazz the type of object to return
* @param contentType
* @param payload the content to convert from
* @return the converted object
* @throws IOException in case of I/O errors
*/
protected abstract T convertFromPayloadInternal(Class<? extends T> clazz, MediaType contentType,
byte[] payload) throws IOException;
/**
* This implementation simply delegates to
* {@link #convertToPayloadInternal(Object, MediaType)}. Future
* implementations might add some default behavior, however.
*/
@Override
public byte[] convertToPayload(T t, MediaType contentType) throws IOException {
return convertToPayloadInternal(t, contentType);
}
protected abstract byte[] convertToPayloadInternal(T t, MediaType contentType)
throws IOException;
/**
* Returns the default content type for the given type. Called when {@link #write}
* is invoked without a specified content type parameter.
* <p>By default, this returns the first element of the
* {@link #setSupportedMediaTypes(List) supportedMediaTypes} property, if any.
* Can be overridden in subclasses.
* @param t the type to return the content type for
* @return the content type, or {@code null} if not known
*/
protected MediaType getDefaultContentType(T t) throws IOException {
List<MediaType> mediaTypes = getSupportedMediaTypes();
return (!mediaTypes.isEmpty() ? mediaTypes.get(0) : null);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import org.springframework.http.MediaType;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ContentTypeNotSupportedException extends Exception {
private final MediaType mediaType;
private final Class<?> sourceOrTargetType;
public ContentTypeNotSupportedException(MediaType mediaType, Class<?> sourceOrTargetType) {
super("Content type '" + mediaType + "' not supported");
this.mediaType = mediaType;
this.sourceOrTargetType = sourceOrTargetType;
}
public MediaType getMediaType() {
return this.mediaType;
}
public Class<?> getSourceOrTargetType() {
return this.sourceOrTargetType;
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.util.Assert;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Implementation of {@link MessageConverter} that can read and write JSON using <a
* href="http://jackson.codehaus.org/">Jackson 2.x's</a> {@link ObjectMapper}.
* <p>
* This converter can be used to bind to typed beans, or untyped {@link java.util.HashMap
* HashMap} instances.
* <p>
* By default, this converter supports {@code application/json}. This can be overridden by
* setting the {@link #setSupportedMediaTypes supportedMediaTypes} property.
* <p>
* Tested against Jackson 2.2; compatible with Jackson 2.0 and higher.
*
* @author Rossen Stoyanchev
* @author Arjen Poutsma
* @since 4.0
*/
public class MappingJackson2MessageConverter extends AbstractMessageConverter<Object> {
private ObjectMapper objectMapper = new ObjectMapper();
private boolean prefixJson = false;
private Boolean prettyPrint;
/**
* Construct a new {@code MappingJackson2HttpMessageConverter}.
*/
public MappingJackson2MessageConverter() {
super(new MediaType("application", "json"), new MediaType("application", "*+json"));
}
/**
* Set the {@code ObjectMapper} for this view. If not set, a default
* {@link ObjectMapper#ObjectMapper() ObjectMapper} is used.
* <p>
* Setting a custom-configured {@code ObjectMapper} is one way to take further control
* of the JSON serialization process. For example, an extended
* {@link org.codehaus.jackson.map.SerializerFactory} can be configured that provides
* custom serializers for specific types. The other option for refining the
* serialization process is to use Jackson's provided annotations on the types to be
* serialized, in which case a custom-configured ObjectMapper is unnecessary.
*/
public void setObjectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
this.objectMapper = objectMapper;
configurePrettyPrint();
}
/**
* Return the underlying {@code ObjectMapper} for this view.
*/
public ObjectMapper getObjectMapper() {
return this.objectMapper;
}
/**
* Indicate whether the JSON output by this view should be prefixed with "{} &&".
* Default is false.
* <p>
* Prefixing the JSON string in this manner is used to help prevent JSON Hijacking.
* The prefix renders the string syntactically invalid as a script so that it cannot
* be hijacked. This prefix does not affect the evaluation of JSON, but if JSON
* validation is performed on the string, the prefix would need to be ignored.
*/
public void setPrefixJson(boolean prefixJson) {
this.prefixJson = prefixJson;
}
/**
* Whether to use the {@link DefaultPrettyPrinter} when writing JSON.
* This is a shortcut for setting up an {@code ObjectMapper} as follows:
* <pre class="code">
* ObjectMapper mapper = new ObjectMapper();
* mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
* converter.setObjectMapper(mapper);
* </pre>
*/
public void setPrettyPrint(boolean prettyPrint) {
this.prettyPrint = prettyPrint;
configurePrettyPrint();
}
private void configurePrettyPrint() {
if (this.prettyPrint != null) {
this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT, this.prettyPrint);
}
}
@Override
public boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType) {
JavaType javaType = getJavaType(clazz, null);
return (this.objectMapper.canDeserialize(javaType) && canConvertFrom(mediaType));
}
@Override
public boolean canConvertToPayload(Class<?> clazz, MediaType mediaType) {
return (this.objectMapper.canSerialize(clazz) && canConvertTo(mediaType));
}
@Override
protected boolean supports(Class<?> clazz) {
// should not be called, since we override canRead/Write instead
throw new UnsupportedOperationException();
}
@Override
protected Object convertFromPayloadInternal(Class<? extends Object> clazz,
MediaType contentType, byte[] payload) throws IOException {
JavaType javaType = getJavaType(clazz, null);
return readJavaType(javaType, payload);
}
private Object readJavaType(JavaType javaType, byte[] payload) {
try {
return this.objectMapper.readValue(payload, javaType);
}
catch (IOException ex) {
throw new HttpMessageNotReadableException("Could not read JSON: " + ex.getMessage(), ex);
}
}
@Override
protected byte[] convertToPayloadInternal(Object object, MediaType contentType) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// The following has been deprecated as late as Jackson 2.2 (April 2013);
// preserved for the time being, for Jackson 2.0/2.1 compatibility.
@SuppressWarnings("deprecation")
JsonGenerator jsonGenerator =
this.objectMapper.getJsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
// A workaround for JsonGenerators not applying serialization features
// https://github.com/FasterXML/jackson-databind/issues/12
if (this.objectMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
jsonGenerator.useDefaultPrettyPrinter();
}
try {
if (this.prefixJson) {
jsonGenerator.writeRaw("{} && ");
}
this.objectMapper.writeValue(jsonGenerator, object);
}
catch (JsonProcessingException ex) {
// TODO: more specific exception
throw new IllegalStateException("Could not write JSON: " + ex.getMessage(), ex);
}
return out.toByteArray();
}
/**
* Return the Jackson {@link JavaType} for the specified type and context class.
* <p>The default implementation returns {@code typeFactory.constructType(type, contextClass)},
* but this can be overridden in subclasses, to allow for custom generic collection handling.
* For instance:
* <pre class="code">
* protected JavaType getJavaType(Type type) {
* if (type instanceof Class && List.class.isAssignableFrom((Class)type)) {
* return TypeFactory.collectionType(ArrayList.class, MyBean.class);
* } else {
* return super.getJavaType(type);
* }
* }
* </pre>
* @param type the type to return the java type for
* @param contextClass a context class for the target type, for example a class
* in which the target type appears in a method signature, can be {@code null}
* signature, can be {@code null}
* @return the java type
*/
protected JavaType getJavaType(Type type, Class<?> contextClass) {
return this.objectMapper.getTypeFactory().constructType(type, contextClass);
}
}
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.util.List;
import org.springframework.http.MediaType;
/**
* Strategy for converting byte a array message payload to and from a typed object.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MessageConverter<T> {
/**
* Whether the given class can be converted from a byte array by this converter.
*
* @param clazz the class to convert from
* @param mediaType the media type of the content, can be {@code null} if not
* specified. Typically the value of a {@code Content-Type} header.
* @return {@code true} if it can be converted; {@code false} otherwise
*/
boolean canConvertFromPayload(Class<?> clazz, MediaType mediaType);
/**
* Convert the payload of the given type.
*
* @param clazz the type of object to return. This type must have previously been
* passed to {@link #canConvertFromPayload(Class, MediaType)} and it must have
* returned {@code true}.
* @param contentType the content type of the payload, can be {@code null}
* @param payload the payload to convert from
* @return the converted object
* @throws IOException in case of I/O errors
*/
T convertFromPayload(Class<? extends T> clazz, MediaType contentType, byte[] payload) throws IOException;
/**
* Whether the given class can be converted to a byte array by this converter.
*
* @param clazz the class to test
* @param mediaType the media type of the content, can be {@code null} if not specified.
* @return {@code true} if writable; {@code false} otherwise
*/
boolean canConvertToPayload(Class<?> clazz, MediaType mediaType);
/**
* Convert the given object to a byte array.
*
* @param t the object to convert. The type of this object must have previously been
* passed to {@link #canConvertToPayload(Class, MediaType)} and it must have returned
* {@code true}.
* @param headers
* @return the output message
* @throws IOException in case of I/O errors
*/
byte[] convertToPayload(T t, MediaType contentType) throws IOException;
/**
* Return the list of {@link MediaType} objects supported by this converter.
*
* @return the list of supported media types
*/
List<MediaType> getSupportedMediaTypes();
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.converter;
import java.io.IOException;
import java.nio.charset.Charset;
import org.springframework.http.MediaType;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StringMessageConverter extends AbstractMessageConverter<String> {
private static final Charset UTF_8 = Charset.forName("UTF-8");
public StringMessageConverter() {
super(MediaType.TEXT_PLAIN);
}
@Override
protected boolean supports(Class<?> clazz) {
return String.class.equals(clazz);
}
@Override
protected String convertFromPayloadInternal(Class<? extends String> clazz, MediaType contentType,
byte[] payload) throws IOException {
return new String(payload, UTF_8);
}
@Override
protected byte[] convertToPayloadInternal(String content, MediaType contentType) throws IOException {
return content.getBytes(UTF_8);
}
}
......@@ -16,6 +16,8 @@
package org.springframework.web.messaging.stomp;
import org.springframework.web.messaging.MessageType;
/**
*
......@@ -41,6 +43,25 @@ public enum StompCommand {
CONNECTED,
MESSAGE,
RECEIPT,
ERROR
ERROR;
public MessageType getMessageType() {
if (this == CONNECT) {
return MessageType.CONNECT;
}
else if (this == SUBSCRIBE) {
return MessageType.SUBSCRIBE;
}
else if (this == UNSUBSCRIBE) {
return MessageType.UNSUBSCRIBE;
}
else if (this == SEND) {
return MessageType.SEND;
}
else {
return MessageType.OTHER;
}
}
}
......@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
......@@ -66,6 +67,10 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
private static final String DESTINATION = "destination";
private static final String CONTENT_TYPE = "content-type";
private static final String CONTENT_LENGTH = "content-length";
private static final String HEARTBEAT = "heart-beat";
......@@ -129,6 +134,29 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
set(DESTINATION, destination);
}
public MediaType getContentType() {
String contentType = getFirst(CONTENT_TYPE);
return StringUtils.hasText(contentType) ? MediaType.valueOf(contentType) : null;
}
public void setContentType(MediaType mediaType) {
if (mediaType != null) {
set(CONTENT_TYPE, mediaType.toString());
}
else {
remove(CONTENT_TYPE);
}
}
public Integer getContentLength() {
String contentLength = getFirst(CONTENT_LENGTH);
return StringUtils.hasText(contentLength) ? new Integer(contentLength) : null;
}
public void setContentLength(int contentLength) {
set(CONTENT_LENGTH, String.valueOf(contentLength));
}
public long[] getHeartbeat() {
String rawValue = getFirst(HEARTBEAT);
if (!StringUtils.hasText(rawValue)) {
......
......@@ -66,7 +66,7 @@ public class StompMessage {
this.sessionId = sessionId;
}
public String getStompSessionId() {
public String getSessionId() {
return this.sessionId;
}
......
......@@ -16,9 +16,15 @@
package org.springframework.web.messaging.stomp.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.PathMatcher;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompMessage;
......@@ -39,6 +45,12 @@ public abstract class AbstractStompService {
private final Reactor reactor;
private final List<String> allowedDestinations = new ArrayList<String>();
private final List<String> disallowedDestinations = new ArrayList<String>();
private final PathMatcher pathMatcher = new AntPathMatcher();
public AbstractStompService(Reactor reactor) {
......@@ -54,13 +66,19 @@ public abstract class AbstractStompService {
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processSubscribe(event.getData(), event.getReplyTo());
StompMessage message = event.getData();
if (isAllowedDestination(message)) {
processSubscribe(event.getData(), event.getReplyTo());
}
}
});
this.reactor.on(Fn.$(StompCommand.SEND), new Consumer<Event<StompMessage>>() {
@Override
public void accept(Event<StompMessage> event) {
processSend(event.getData());
StompMessage message = event.getData();
if (isAllowedDestination(message)) {
processSend(event.getData());
}
}
});
this.reactor.on(Fn.$(StompCommand.DISCONNECT), new Consumer<Event<StompMessage>>() {
......@@ -108,10 +126,49 @@ public abstract class AbstractStompService {
}
/**
* Ant-style destination patterns that this STOMP service is allowed to process.
*/
public void setAllowedDestinations(String... patterns) {
this.allowedDestinations.clear();
this.allowedDestinations.addAll(Arrays.asList(patterns));
}
/**
* Ant-style destination patterns that this STOMP service should skip.
*/
public void setDisallowedDestinations(String... patterns) {
this.disallowedDestinations.clear();
this.disallowedDestinations.addAll(Arrays.asList(patterns));
}
public Reactor getReactor() {
return this.reactor;
}
private boolean isAllowedDestination(StompMessage message) {
String destination = message.getHeaders().getDestination();
if (destination == null) {
return true;
}
if (!this.disallowedDestinations.isEmpty()) {
for (String pattern : this.disallowedDestinations) {
if (this.pathMatcher.match(pattern, destination)) {
return false;
}
}
}
if (!this.allowedDestinations.isEmpty()) {
for (String pattern : this.allowedDestinations) {
if (this.pathMatcher.match(pattern, destination)) {
return true;
}
}
return false;
}
return true;
}
protected void processConnect(StompMessage message, Object replyTo) {
}
......
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Controller;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.web.messaging.MessageMapping;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.converter.StringMessageConverter;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.service.support.MessageBodyArgumentResolver;
import org.springframework.web.messaging.stomp.service.support.MessageBrokerArgumentResolver;
import org.springframework.web.messaging.stomp.service.support.SubscriptionArgumentResolver;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.method.HandlerMethodSelector;
import reactor.core.Reactor;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class AnnotationStompService extends AbstractStompService
implements ApplicationContextAware, InitializingBean {
private List<MessageConverter<?>> messageConverters;
private ApplicationContext applicationContext;
private Map<MessageMapping, HandlerMethod> handlerMethods = new HashMap<MessageMapping, HandlerMethod>();
private MessageMethodArgumentResolverComposite argumentResolvers = new MessageMethodArgumentResolverComposite();
public AnnotationStompService(Reactor reactor) {
super(reactor);
}
public void setMessageConverters(List<MessageConverter<?>> messageConverters) {
this.messageConverters = messageConverters;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() {
initHandlerMethods();
if (this.messageConverters == null) {
this.messageConverters = new ArrayList<MessageConverter<?>>();
this.messageConverters.add(new StringMessageConverter());
this.messageConverters.add(new MappingJackson2MessageConverter());
}
this.argumentResolvers.addResolver(new SubscriptionArgumentResolver(getReactor(), this.messageConverters));
this.argumentResolvers.addResolver(new MessageBrokerArgumentResolver(getReactor(), this.messageConverters));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
}
protected void initHandlerMethods() {
String[] beanNames = this.applicationContext.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
if (isHandler(this.applicationContext.getType(beanName))){
detectHandlerMethods(beanName);
}
}
}
protected boolean isHandler(Class<?> beanType) {
return ((AnnotationUtils.findAnnotation(beanType, Controller.class) != null) ||
(AnnotationUtils.findAnnotation(beanType, MessageMapping.class) != null));
}
protected void detectHandlerMethods(final Object handler) {
Class<?> handlerType = (handler instanceof String) ?
this.applicationContext.getType((String) handler) : handler.getClass();
final Class<?> userType = ClassUtils.getUserClass(handlerType);
Set<Method> methods = HandlerMethodSelector.selectMethods(userType, new MethodFilter() {
@Override
public boolean matches(Method method) {
return AnnotationUtils.findAnnotation(method, MessageMapping.class) != null;
}
});
for (Method method : methods) {
MessageMapping mapping = AnnotationUtils.findAnnotation(method, MessageMapping.class);
HandlerMethod handlerMethod = createHandlerMethod(handler, method);
this.handlerMethods.put(mapping, handlerMethod);
}
}
protected HandlerMethod createHandlerMethod(Object handler, Method method) {
HandlerMethod handlerMethod;
if (handler instanceof String) {
String beanName = (String) handler;
handlerMethod = new HandlerMethod(beanName, this.applicationContext, method);
}
else {
handlerMethod = new HandlerMethod(handler, method);
}
return handlerMethod;
}
protected HandlerMethod getHandlerMethod(String destination, MessageType messageType) {
for (MessageMapping mapping : this.handlerMethods.keySet()) {
boolean match = false;
for (String mappingDestination : mapping.value()) {
if (destination.equals(mappingDestination)) {
match = true;
break;
}
}
if (match && messageType.equals(mapping.messageType())) {
return this.handlerMethods.get(mapping);
}
}
return null;
}
@Override
protected void processSubscribe(StompMessage message, Object replyTo) {
handleMessage(message, replyTo, MessageType.SUBSCRIBE);
}
@Override
protected void processSend(StompMessage message) {
handleMessage(message, null, MessageType.SEND);
}
private void handleMessage(final StompMessage message, final Object replyTo, MessageType messageType) {
String destination = message.getHeaders().getDestination();
HandlerMethod match = getHandlerMethod(destination, messageType);
if (match == null) {
return;
}
HandlerMethod handlerMethod = match.createWithResolvedBean();
InvocableMessageHandlerMethod messageHandlerMethod = new InvocableMessageHandlerMethod(handlerMethod);
messageHandlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
try {
messageHandlerMethod.invoke(message, replyTo);
}
catch (Throwable e) {
// TODO: send error message, or add @ExceptionHandler-like capability
e.printStackTrace();
}
}
}
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import org.springframework.core.GenericTypeResolver;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.MethodParameter;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.WebDataBinder;
import org.springframework.web.bind.support.WebDataBinderFactory;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.method.HandlerMethod;
/**
* Invokes the handler method for a given message after resolving
* its method argument values through registered {@link MessageMethodArgumentResolver}s.
* <p>
* Argument resolution often requires a {@link WebDataBinder} for data binding or for type
* conversion. Use the {@link #setDataBinderFactory(WebDataBinderFactory)} property to
* supply a binder factory to pass to argument resolvers.
* <p>
* Use {@link #setMessageMethodArgumentResolvers(MessageMethodArgumentResolverComposite)}
* to customize the list of argument resolvers.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class InvocableMessageHandlerMethod extends HandlerMethod {
private MessageMethodArgumentResolverComposite argumentResolvers = new MessageMethodArgumentResolverComposite();
private ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
/**
* Create an instance from a {@code HandlerMethod}.
*/
public InvocableMessageHandlerMethod(HandlerMethod handlerMethod) {
super(handlerMethod);
}
/**
* Constructs a new handler method with the given bean instance, method name and
* parameters.
*
* @param bean the object bean
* @param methodName the method name
* @param parameterTypes the method parameter types
* @throws NoSuchMethodException when the method cannot be found
*/
public InvocableMessageHandlerMethod(
Object bean, String methodName, Class<?>... parameterTypes) throws NoSuchMethodException {
super(bean, methodName, parameterTypes);
}
/**
* Set {@link MessageMethodArgumentResolver}s to use to use for resolving method
* argument values.
*/
public void setMessageMethodArgumentResolvers(MessageMethodArgumentResolverComposite argumentResolvers) {
this.argumentResolvers = argumentResolvers;
}
/**
* Set the ParameterNameDiscoverer for resolving parameter names when needed (e.g.
* default request attribute name).
* <p>
* Default is an
* {@link org.springframework.core.LocalVariableTableParameterNameDiscoverer}
* instance.
*/
public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
this.parameterNameDiscoverer = parameterNameDiscoverer;
}
/**
* TODO
*
* @exception Exception raised if no suitable argument resolver can be found, or the
* method raised an exception
*/
public final Object invoke(StompMessage message, Object replyTo) throws Exception {
Object[] args = getMethodArgumentValues(message, replyTo);
if (logger.isTraceEnabled()) {
StringBuilder builder = new StringBuilder("Invoking [");
builder.append(this.getMethod().getName()).append("] method with arguments ");
builder.append(Arrays.asList(args));
logger.trace(builder.toString());
}
Object returnValue = invoke(args);
if (logger.isTraceEnabled()) {
logger.trace("Method [" + this.getMethod().getName() + "] returned [" + returnValue + "]");
}
return returnValue;
}
/**
* Get the method argument values for the current request.
*/
private Object[] getMethodArgumentValues(StompMessage message, Object replyTo) throws Exception {
MethodParameter[] parameters = getMethodParameters();
Object[] args = new Object[parameters.length];
for (int i = 0; i < parameters.length; i++) {
MethodParameter parameter = parameters[i];
parameter.initParameterNameDiscovery(parameterNameDiscoverer);
GenericTypeResolver.resolveParameterType(parameter, getBean().getClass());
args[i] = resolveProvidedArgument(parameter);
if (args[i] != null) {
continue;
}
if (this.argumentResolvers.supportsParameter(parameter)) {
try {
args[i] = this.argumentResolvers.resolveArgument(parameter, message, replyTo);
continue;
} catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(getArgumentResolutionErrorMessage("Error resolving argument", i), ex);
}
throw ex;
}
}
if (args[i] == null) {
String msg = getArgumentResolutionErrorMessage("No suitable resolver for argument", i);
throw new IllegalStateException(msg);
}
}
return args;
}
private String getArgumentResolutionErrorMessage(String message, int index) {
MethodParameter param = getMethodParameters()[index];
message += " [" + index + "] [type=" + param.getParameterType().getName() + "]";
return getDetailedErrorMessage(message);
}
/**
* Adds HandlerMethod details such as the controller type and method signature to the given error message.
* @param message error message to append the HandlerMethod details to
*/
protected String getDetailedErrorMessage(String message) {
StringBuilder sb = new StringBuilder(message).append("\n");
sb.append("HandlerMethod details: \n");
sb.append("Controller [").append(getBeanType().getName()).append("]\n");
sb.append("Method [").append(getBridgedMethod().toGenericString()).append("]\n");
return sb.toString();
}
/**
* Attempt to resolve a method parameter from the list of provided argument values.
*/
private Object resolveProvidedArgument(MethodParameter parameter, Object... providedArgs) {
if (providedArgs == null) {
return null;
}
for (Object providedArg : providedArgs) {
if (parameter.getParameterType().isInstance(providedArg)) {
return providedArg;
}
}
return null;
}
/**
* Invoke the handler method with the given argument values.
*/
private Object invoke(Object... args) throws Exception {
ReflectionUtils.makeAccessible(this.getBridgedMethod());
try {
return getBridgedMethod().invoke(getBean(), args);
}
catch (IllegalArgumentException e) {
String msg = getInvocationErrorMessage(e.getMessage(), args);
throw new IllegalArgumentException(msg, e);
}
catch (InvocationTargetException e) {
// Unwrap for HandlerExceptionResolvers ...
Throwable targetException = e.getTargetException();
if (targetException instanceof RuntimeException) {
throw (RuntimeException) targetException;
}
else if (targetException instanceof Error) {
throw (Error) targetException;
}
else if (targetException instanceof Exception) {
throw (Exception) targetException;
}
else {
String msg = getInvocationErrorMessage("Failed to invoke controller method", args);
throw new IllegalStateException(msg, targetException);
}
}
}
private String getInvocationErrorMessage(String message, Object[] resolvedArgs) {
StringBuilder sb = new StringBuilder(getDetailedErrorMessage(message));
sb.append("Resolved arguments: \n");
for (int i=0; i < resolvedArgs.length; i++) {
sb.append("[").append(i).append("] ");
if (resolvedArgs[i] == null) {
sb.append("[null] \n");
}
else {
sb.append("[type=").append(resolvedArgs[i].getClass().getName()).append("] ");
sb.append("[value=").append(resolvedArgs[i]).append("]\n");
}
}
return sb.toString();
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import org.springframework.core.MethodParameter;
import org.springframework.web.messaging.stomp.StompMessage;
/**
* Strategy interface for resolving method parameters into argument values in
* the context of a given message.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MessageMethodArgumentResolver {
/**
* Whether the given {@linkplain MethodParameter method parameter} is
* supported by this resolver.
*
* @param parameter the method parameter to check
* @return {@code true} if this resolver supports the supplied parameter;
* {@code false} otherwise
*/
boolean supportsParameter(MethodParameter parameter);
/**
* Resolves a method parameter into an argument value from a given message.
*
* @param parameter the method parameter to resolve. This parameter must
* have previously been passed to
* {@link #supportsParameter(org.springframework.core.MethodParameter)}
* and it must have returned {@code true}
* @param message
* @param replyTo
*
* @return the resolved argument value, or {@code null}.
*
* @throws Exception in case of errors with the preparation of argument values
*/
Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception;
}
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.util.Assert;
import org.springframework.web.messaging.stomp.StompMessage;
/**
* Resolves method parameters by delegating to a list of registered
* {@link MessageMethodArgumentResolver}. Previously resolved method parameters are cached
* for faster lookups.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageMethodArgumentResolverComposite implements MessageMethodArgumentResolver {
protected final Log logger = LogFactory.getLog(getClass());
private final List<MessageMethodArgumentResolver> argumentResolvers =
new LinkedList<MessageMethodArgumentResolver>();
private final Map<MethodParameter, MessageMethodArgumentResolver> argumentResolverCache =
new ConcurrentHashMap<MethodParameter, MessageMethodArgumentResolver>(256);
/**
* Return a read-only list with the contained resolvers, or an empty list.
*/
public List<MessageMethodArgumentResolver> getResolvers() {
return Collections.unmodifiableList(this.argumentResolvers);
}
/**
* Whether the given {@linkplain MethodParameter method parameter} is supported by any registered
* {@link MessageMethodArgumentResolver}.
*/
@Override
public boolean supportsParameter(MethodParameter parameter) {
return getArgumentResolver(parameter) != null;
}
/**
* Iterate over registered {@link MessageMethodArgumentResolver}s and invoke the one that supports it.
* @exception IllegalStateException if no suitable {@link MessageMethodArgumentResolver} is found.
*/
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception {
MessageMethodArgumentResolver resolver = getArgumentResolver(parameter);
Assert.notNull(resolver, "Unknown parameter type [" + parameter.getParameterType().getName() + "]");
return resolver.resolveArgument(parameter, message, replyTo);
}
/**
* Find a registered {@link MessageMethodArgumentResolver} that supports the given method parameter.
*/
private MessageMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
MessageMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) {
for (MessageMethodArgumentResolver methodArgumentResolver : this.argumentResolvers) {
if (logger.isTraceEnabled()) {
logger.trace("Testing if argument resolver [" + methodArgumentResolver + "] supports [" +
parameter.getGenericParameterType() + "]");
}
if (methodArgumentResolver.supportsParameter(parameter)) {
result = methodArgumentResolver;
this.argumentResolverCache.put(parameter, result);
break;
}
}
}
return result;
}
/**
* Add the given {@link MessageMethodArgumentResolver}.
*/
public MessageMethodArgumentResolverComposite addResolver(MessageMethodArgumentResolver argumentResolver) {
this.argumentResolvers.add(argumentResolver);
return this;
}
/**
* Add the given {@link MessageMethodArgumentResolver}s.
*/
public MessageMethodArgumentResolverComposite addResolvers(
List<? extends MessageMethodArgumentResolver> argumentResolvers) {
if (argumentResolvers != null) {
for (MessageMethodArgumentResolver resolver : argumentResolvers) {
this.argumentResolvers.add(resolver);
}
}
return this;
}
}
......@@ -48,7 +48,7 @@ public class RelayStompService extends AbstractStompService {
private Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
private final StompMessageConverter converter = new StompMessageConverter();
private final StompMessageConverter messageConverter = new StompMessageConverter();
private final TaskExecutor taskExecutor;
......@@ -58,9 +58,10 @@ public class RelayStompService extends AbstractStompService {
this.taskExecutor = executor; // For now, a naive way to manage socket reading
}
protected void processConnect(StompMessage stompMessage, final Object replyTo) {
final String stompSessionId = stompMessage.getStompSessionId();
final String stompSessionId = stompMessage.getSessionId();
final RelaySession session = new RelaySession();
this.relaySessions.put(stompSessionId, session);
......@@ -80,19 +81,19 @@ public class RelayStompService extends AbstractStompService {
}
private void relayStompMessage(StompMessage stompMessage) {
RelaySession session = RelayStompService.this.relaySessions.get(stompMessage.getStompSessionId());
RelaySession session = RelayStompService.this.relaySessions.get(stompMessage.getSessionId());
Assert.notNull(session, "RelaySession not found");
try {
if (logger.isTraceEnabled()) {
logger.trace("Forwarding: " + stompMessage);
}
byte[] bytes = converter.fromStompMessage(stompMessage);
byte[] bytes = messageConverter.fromStompMessage(stompMessage);
session.getOutputStream().write(bytes);
session.getOutputStream().flush();
}
catch (Exception e) {
e.printStackTrace();
clearRelaySession(stompMessage.getStompSessionId());
clearRelaySession(stompMessage.getSessionId());
}
}
......@@ -209,7 +210,7 @@ public class RelayStompService extends AbstractStompService {
}
else if (b == 0x00) {
byte[] bytes = out.toByteArray();
StompMessage message = RelayStompService.this.converter.toStompMessage(bytes);
StompMessage message = RelayStompService.this.messageConverter.toStompMessage(bytes);
getReactor().notify(replyTo, Event.wrap(message));
out.reset();
}
......
......@@ -29,7 +29,7 @@ import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.Registration;
import reactor.fn.registry.Registration;
/**
......@@ -64,7 +64,7 @@ public class SimpleStompService extends AbstractStompService {
getReactor().notify(replyTo, Event.wrap(outMessage));
}
});
addSubscription(message.getStompSessionId(), registration);
addSubscription(message.getSessionId(), registration);
}
private void addSubscription(String sessionId, Registration<?> registration) {
......@@ -85,7 +85,7 @@ public class SimpleStompService extends AbstractStompService {
@Override
protected void processDisconnect(StompMessage message) {
removeSubscriptions(message.getStompSessionId());
removeSubscriptions(message.getSessionId());
}
@Override
......
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.springframework.http.MediaType;
import org.springframework.util.CollectionUtils;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.service.MessageMethodArgumentResolver;
import reactor.core.Reactor;
import reactor.util.Assert;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractPayloadSendingArgumentResolver implements MessageMethodArgumentResolver {
private final Reactor reactor;
private final List<MessageConverter<?>> converters;
public AbstractPayloadSendingArgumentResolver(Reactor reactor, List<MessageConverter<?>> converters) {
Assert.notNull(reactor, "reactor is required");
this.reactor = reactor;
this.converters = (converters != null) ? converters : new ArrayList<MessageConverter<?>>();
}
public Reactor getReactor() {
return this.reactor;
}
public List<MessageConverter<?>> getMessageConverters() {
return this.converters;
}
@SuppressWarnings("unchecked")
protected byte[] convertToPayload(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
if (content == null) {
return null;
}
Class<? extends Object> clazz = content.getClass();
if (byte[].class.equals(clazz)) {
return (byte[]) content;
}
else if (!CollectionUtils.isEmpty(this.converters)) {
for (MessageConverter converter : getMessageConverters()) {
if (converter.canConvertToPayload(clazz, contentType)) {
return converter.convertToPayload(content, contentType);
}
}
}
throw new ContentTypeNotSupportedException(contentType, clazz);
}
}
\ No newline at end of file
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.util.ArrayList;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.MessageBody;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.service.MessageMethodArgumentResolver;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBodyArgumentResolver implements MessageMethodArgumentResolver {
private final List<MessageConverter<?>> converters;
public MessageBodyArgumentResolver(List<MessageConverter<?>> converters) {
this.converters = (converters != null) ? converters : new ArrayList<MessageConverter<?>>();
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return true;
}
@SuppressWarnings("unchecked")
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo)
throws Exception {
byte[] payload = message.getPayload();
Class<?> parameterType = parameter.getParameterType();
if (byte[].class.isAssignableFrom(parameterType)) {
return payload;
}
Object arg = null;
MessageBody annot = parameter.getParameterAnnotation(MessageBody.class);
MediaType contentType = message.getHeaders().getContentType();
if (annot == null || annot.required()) {
for (MessageConverter converter : this.converters) {
if (converter.canConvertFromPayload(parameterType, contentType)) {
return converter.convertFromPayload(parameterType, contentType, payload);
}
}
throw new ContentTypeNotSupportedException(contentType, parameterType);
}
return arg;
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.io.IOException;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.web.messaging.MessageBroker;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.core.Reactor;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBrokerArgumentResolver extends AbstractPayloadSendingArgumentResolver {
public MessageBrokerArgumentResolver(Reactor reactor, List<MessageConverter<?>> converters) {
super(reactor, converters);
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return MessageBroker.class.equals(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception {
return new DefaultMessageBroker(message.getSessionId());
}
private class DefaultMessageBroker implements MessageBroker {
private final String sessionId;
public DefaultMessageBroker(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void send(String destination, Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
StompHeaders headers = new StompHeaders();
headers.setDestination(destination);
headers.setContentType(contentType);
byte[] payload = convertToPayload(content, contentType);
if (payload != null) {
// TODO: set content-length
}
StompMessage message = new StompMessage(StompCommand.SEND, headers, payload);
message.setSessionId(this.sessionId);
getReactor().notify(StompCommand.SEND, Event.wrap(message));
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service.support;
import java.io.IOException;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import org.springframework.web.messaging.Subscription;
import org.springframework.web.messaging.converter.ContentTypeNotSupportedException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.core.Reactor;
import reactor.fn.Event;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SubscriptionArgumentResolver extends AbstractPayloadSendingArgumentResolver {
public SubscriptionArgumentResolver(Reactor reactor, List<MessageConverter<?>> converters) {
super(reactor, converters);
}
@Override
public boolean supportsParameter(MethodParameter parameter) {
return Subscription.class.equals(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, StompMessage message, Object replyTo) throws Exception {
Assert.isTrue(StompCommand.SUBSCRIBE.equals(message.getCommand()), "Not a subscribe command");
return new DefaultSubscription(message.getHeaders().getDestination(), replyTo);
}
private class DefaultSubscription implements Subscription {
private final String destination;
private final Object replyTo;
public DefaultSubscription(String destination, Object replyTo) {
this.destination = destination;
this.replyTo = replyTo;
}
@Override
public void reply(Object content) throws IOException, ContentTypeNotSupportedException {
reply(content, null);
}
@Override
public void reply(Object content, MediaType contentType)
throws IOException, ContentTypeNotSupportedException {
StompHeaders headers = new StompHeaders();
headers.setDestination(this.destination);
headers.setContentType(contentType);
byte[] payload = convertToPayload(content, contentType);
if (payload != null) {
// TODO: set content-length
}
StompMessage message = new StompMessage(StompCommand.MESSAGE, headers, payload);
getReactor().notify(this.replyTo, Event.wrap(message));
}
}
}
......@@ -35,7 +35,7 @@ import reactor.Fn;
import reactor.core.Reactor;
import reactor.fn.Consumer;
import reactor.fn.Event;
import reactor.fn.Registration;
import reactor.fn.registry.Registration;
/**
* @author Gary Russell
......@@ -62,19 +62,19 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
StompCommand command = message.getCommand();
if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
registerConnectionClosedCallback(session);
connect(session, message);
handleConnect(session, message);
}
else if (StompCommand.SUBSCRIBE.equals(command)) {
subscribe(session, message);
handleSubscribe(session, message);
}
else if (StompCommand.UNSUBSCRIBE.equals(command)) {
unsubscribe(session, message);
handleUnsubscribe(session, message);
}
else if (StompCommand.SEND.equals(command)) {
send(session, message);
handleSend(session, message);
}
else if (StompCommand.DISCONNECT.equals(command)) {
disconnect(session, message);
handleDisconnect(session, message);
}
else if (StompCommand.ACK.equals(command) || StompCommand.NACK.equals(command)) {
this.reactor.notify(command, Event.wrap(message));
......@@ -121,7 +121,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
}
}
protected void connect(final StompSession session, StompMessage stompMessage) throws IOException {
protected void handleConnect(final StompSession session, StompMessage stompMessage) throws IOException {
StompHeaders headers = new StompHeaders();
Set<String> acceptVersions = stompMessage.getHeaders().getAcceptVersion();
......@@ -171,7 +171,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
this.reactor.notify(StompCommand.CONNECT, Event.wrap(stompMessage, replyToKey));
}
protected void subscribe(final StompSession session, StompMessage message) {
protected void handleSubscribe(final StompSession session, StompMessage message) {
final String subscriptionId = message.getHeaders().getId();
String replyToKey = getSubscriptionReplyKey(session, subscriptionId);
......@@ -218,7 +218,7 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
list.add(registration);
}
protected void unsubscribe(StompSession session, StompMessage message) {
protected void handleUnsubscribe(StompSession session, StompMessage message) {
cancelRegistration(session, message.getHeaders().getId());
this.reactor.notify(StompCommand.UNSUBSCRIBE, Event.wrap(message));
}
......@@ -237,11 +237,11 @@ public class DefaultStompWebSocketHandler extends AbstractStompWebSocketHandler
}
}
protected void send(StompSession session, StompMessage stompMessage) {
protected void handleSend(StompSession session, StompMessage stompMessage) {
this.reactor.notify(StompCommand.SEND, Event.wrap(stompMessage));
}
protected void disconnect(StompSession session, StompMessage stompMessage) {
protected void handleDisconnect(StompSession session, StompMessage stompMessage) {
removeSubscriptions(session);
this.reactor.notify(StompCommand.DISCONNECT, Event.wrap(stompMessage));
}
......
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.socket;
import org.springframework.web.messaging.stomp.StompMessage;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface StompMessageInterceptor {
boolean handleConnect(StompMessage message);
boolean handleSubscribe(StompMessage message);
boolean handleUnsubscribe(StompMessage message);
StompMessage handleSend(StompMessage message);
void handleDisconnect();
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.stomp.service;
import org.junit.Before;
import org.junit.Test;
import org.springframework.stereotype.Controller;
import org.springframework.web.context.support.StaticWebApplicationContext;
import org.springframework.web.messaging.MessageMapping;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.method.HandlerMethod;
import reactor.core.Environment;
import reactor.core.Reactor;
import static org.junit.Assert.*;
/**
* Test fixture for {@link AnnotationStompService}.
* @author Rossen Stoyanchev
*/
public class AnnotationStompServiceTests {
private AnnotationStompService service;
@Before
public void setup() {
StaticWebApplicationContext wac = new StaticWebApplicationContext();
wac.registerSingleton("controller", TestController.class);
Reactor reactor = new Reactor(new Environment());
this.service = new AnnotationStompService(reactor);
this.service.setApplicationContext(wac);
this.service.afterPropertiesSet();
}
@Test
public void noMatch() {
assertNull(this.service.getHandlerMethod("/nomatch", MessageType.CONNECT));
}
@Test
public void match() {
HandlerMethod handlerMethod = this.service.getHandlerMethod("/foo", MessageType.SUBSCRIBE);
assertNotNull(handlerMethod);
assertEquals("handleSubscribe", handlerMethod.getMethod().getName());
assertEquals("controller", handlerMethod.getBean());
}
@Controller
private static class TestController {
@MessageMapping(value="/foo", messageType=MessageType.SUBSCRIBE)
public void handleSubscribe() {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册