提交 69ef364e 编写于 作者: R Rossen Stoyanchev

Introduce messaging package

org.springframework.web.stomp is now
org.springframework.web.messaging.stomp

Also classes in the ~.stomp.server and ~.stomp.adapter packages have
been renamed.
上级 c67b6943
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.stomp;
package org.springframework.web.messaging.stomp;
/**
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
package org.springframework.web.messaging.stomp;
import org.springframework.core.NestedRuntimeException;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.stomp;
package org.springframework.web.messaging.stomp;
import java.io.Serializable;
import java.util.Collection;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.stomp;
package org.springframework.web.messaging.stomp;
import java.nio.charset.Charset;
......
......@@ -13,13 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp;
package org.springframework.web.messaging.stomp;
import java.io.IOException;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
......@@ -28,9 +27,15 @@ public interface StompSession {
String getId();
/**
* TODO...
* <p>
* If the message is a STOMP ERROR message, the session will also be closed.
*
*/
void sendMessage(StompMessage message) throws IOException;
/**
* Register a task to be invoked if the underlying connection is closed.
*/
void registerConnectionClosedCallback(Runnable task);
}
......@@ -14,20 +14,18 @@
* limitations under the License.
*/
package org.springframework.web.stomp.adapter;
package org.springframework.web.messaging.stomp.adapter;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompSession;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface StompMessageProcessor {
public interface StompMessageHandler {
void processMessage(StompSession stompSession, StompMessage message);
void processConnectionClosed(StompSession stompSession);
void handleMessage(StompSession stompSession, StompMessage message);
}
......@@ -13,39 +13,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.adapter;
package org.springframework.web.messaging.stomp.adapter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.util.Assert;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompSession;
import org.springframework.web.messaging.stomp.support.StompMessageConverter;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.stomp.support.StompMessageConverter;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompWebSocketHandler extends TextWebSocketHandlerAdapter {
private final StompMessageProcessor messageProcessor;
private final StompMessageHandler messageHandler;
private final StompMessageConverter messageConverter = new StompMessageConverter();
private final Map<String, StompSession> sessions = new ConcurrentHashMap<String, StompSession>();
private final Map<String, WebSocketStompSession> sessions = new ConcurrentHashMap<String, WebSocketStompSession>();
public StompWebSocketHandler(StompMessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
public StompWebSocketHandler(StompMessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
......@@ -68,7 +67,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter {
// TODO: validate size limits
// http://stomp.github.io/stomp-specification-1.2.html#Size_Limits
this.messageProcessor.processMessage(stompSession, stompMessage);
this.messageHandler.handleMessage(stompSession, stompMessage);
// TODO: send RECEIPT message if incoming message has "receipt" header
// http://stomp.github.io/stomp-specification-1.2.html#Header_receipt
......@@ -89,9 +88,9 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
StompSession stompSession = this.sessions.remove(session.getId());
WebSocketStompSession stompSession = this.sessions.remove(session.getId());
if (stompSession != null) {
this.messageProcessor.processConnectionClosed(stompSession);
stompSession.handleConnectionClosed();
}
}
......
......@@ -14,18 +14,20 @@
* limitations under the License.
*/
package org.springframework.web.stomp.adapter;
package org.springframework.web.messaging.stomp.adapter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.springframework.util.Assert;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompSession;
import org.springframework.web.messaging.stomp.support.StompMessageConverter;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.stomp.support.StompMessageConverter;
/**
......@@ -40,6 +42,8 @@ public class WebSocketStompSession implements StompSession {
private final StompMessageConverter messageConverter;
private final List<Runnable> connectionClosedTasks = new ArrayList<Runnable>();
public WebSocketStompSession(WebSocketSession webSocketSession, StompMessageConverter messageConverter) {
Assert.notNull(webSocketSession, "webSocketSession is required");
......@@ -70,4 +74,19 @@ public class WebSocketStompSession implements StompSession {
}
}
public void registerConnectionClosedCallback(Runnable task) {
this.connectionClosedTasks.add(task);
}
public void handleConnectionClosed() {
for (Runnable task : this.connectionClosedTasks) {
try {
task.run();
}
catch (Throwable t) {
// ignore
}
}
}
}
\ No newline at end of file
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.stomp.server;
package org.springframework.web.messaging.stomp.server;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
......@@ -31,10 +31,10 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.support.StompMessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.support.StompMessageConverter;
import reactor.Fn;
import reactor.core.Reactor;
......@@ -47,9 +47,9 @@ import reactor.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class RelayStompReactorService {
public class RelayStompService {
private static final Log logger = LogFactory.getLog(RelayStompReactorService.class);
private static final Log logger = LogFactory.getLog(RelayStompService.class);
private final Reactor reactor;
......@@ -61,7 +61,7 @@ public class RelayStompReactorService {
private final TaskExecutor taskExecutor;
public RelayStompReactorService(Reactor reactor, TaskExecutor executor) {
public RelayStompService(Reactor reactor, TaskExecutor executor) {
this.reactor = reactor;
this.taskExecutor = executor; // For now, a naively way to manage socket reading
......@@ -91,7 +91,7 @@ public class RelayStompReactorService {
}
private RelaySession getRelaySession(String stompSessionId) {
RelaySession session = RelayStompReactorService.this.relaySessions.get(stompSessionId);
RelaySession session = RelayStompService.this.relaySessions.get(stompSessionId);
Assert.notNull(session, "RelaySession not found");
return session;
}
......@@ -188,8 +188,8 @@ public class RelayStompReactorService {
}
else if (b == 0x00) {
byte[] bytes = out.toByteArray();
StompMessage message = RelayStompReactorService.this.converter.toStompMessage(bytes);
RelayStompReactorService.this.reactor.notify(replyTo, Fn.event(message));
StompMessage message = RelayStompService.this.converter.toStompMessage(bytes);
RelayStompService.this.reactor.notify(replyTo, Fn.event(message));
out.reset();
}
else {
......@@ -209,7 +209,7 @@ public class RelayStompReactorService {
StompHeaders headers = new StompHeaders();
headers.setMessage("Lost connection");
StompMessage errorMessage = new StompMessage(StompCommand.ERROR, headers);
RelayStompReactorService.this.reactor.notify(replyTo, Fn.event(errorMessage));
RelayStompService.this.reactor.notify(replyTo, Fn.event(errorMessage));
}
}
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.server;
package org.springframework.web.messaging.stomp.server;
import java.io.IOException;
import java.util.ArrayList;
......@@ -25,12 +25,12 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompException;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.stomp.StompSession;
import org.springframework.web.stomp.adapter.StompMessageProcessor;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompException;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompSession;
import org.springframework.web.messaging.stomp.adapter.StompMessageHandler;
import reactor.Fn;
import reactor.core.Reactor;
......@@ -43,24 +43,26 @@ import reactor.fn.Registration;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorServerStompMessageProcessor implements StompMessageProcessor {
public class ServerStompMessageHandler implements StompMessageHandler {
private static Log logger = LogFactory.getLog(ReactorServerStompMessageProcessor.class);
private static Log logger = LogFactory.getLog(ServerStompMessageHandler.class);
private final Reactor reactor;
private Map<String, List<Registration<?>>> registrationsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
private Map<String, List<Registration<?>>> registrationsBySession =
new ConcurrentHashMap<String, List<Registration<?>>>();
public ReactorServerStompMessageProcessor(Reactor reactor) {
public ServerStompMessageHandler(Reactor reactor) {
this.reactor = reactor;
}
public void processMessage(StompSession session, StompMessage message) {
public void handleMessage(StompSession session, StompMessage message) {
try {
StompCommand command = message.getCommand();
if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
registerConnectionClosedCallback(session);
connect(session, message);
}
else if (StompCommand.SUBSCRIBE.equals(command)) {
......@@ -92,6 +94,16 @@ public class ReactorServerStompMessageProcessor implements StompMessageProcessor
}
}
private void registerConnectionClosedCallback(final StompSession session) {
session.registerConnectionClosedCallback(new Runnable() {
@Override
public void run() {
removeSubscriptions(session);
reactor.notify("CONNECTION_CLOSED", Fn.event(session.getId()));
}
});
}
private void handleError(final StompSession session, Throwable t) {
logger.error("Terminating STOMP session due to failure to send message: ", t);
sendErrorMessage(session, t.getMessage());
......@@ -233,10 +245,4 @@ public class ReactorServerStompMessageProcessor implements StompMessageProcessor
return true;
}
@Override
public void processConnectionClosed(StompSession session) {
removeSubscriptions(session);
this.reactor.notify("CONNECTION_CLOSED", Fn.event(session.getId()));
}
}
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.web.stomp.server;
package org.springframework.web.messaging.stomp.server;
import java.util.ArrayList;
import java.util.List;
......@@ -23,9 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
import reactor.Fn;
import reactor.core.Reactor;
......@@ -38,16 +38,16 @@ import reactor.fn.Registration;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleStompReactorService {
public class SimpleStompService {
private static final Log logger = LogFactory.getLog(SimpleStompReactorService.class);
private static final Log logger = LogFactory.getLog(SimpleStompService.class);
private final Reactor reactor;
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
public SimpleStompReactorService(Reactor reactor) {
public SimpleStompService(Reactor reactor) {
this.reactor = reactor;
this.reactor.on(Fn.$(StompCommand.SUBSCRIBE), new SubscribeConsumer());
this.reactor.on(Fn.$(StompCommand.SEND), new SendConsumer());
......@@ -85,7 +85,7 @@ public class SimpleStompReactorService {
logger.debug("Subscribe " + message);
}
Registration<?> registration = SimpleStompReactorService.this.reactor.on(
Registration<?> registration = SimpleStompService.this.reactor.on(
Fn.$("destination:" + message.getHeaders().getDestination()),
new Consumer<Event<StompMessage>>() {
@Override
......@@ -94,7 +94,7 @@ public class SimpleStompReactorService {
StompHeaders headers = new StompHeaders();
headers.setDestination(inMessage.getHeaders().getDestination());
StompMessage outMessage = new StompMessage(StompCommand.MESSAGE, headers, inMessage.getPayload());
SimpleStompReactorService.this.reactor.notify(event.getReplyTo(), Fn.event(outMessage));
SimpleStompService.this.reactor.notify(event.getReplyTo(), Fn.event(outMessage));
}
});
......@@ -110,7 +110,7 @@ public class SimpleStompReactorService {
logger.debug("Message received: " + message);
String destination = message.getHeaders().getDestination();
SimpleStompReactorService.this.reactor.notify("destination:" + destination, Fn.event(message));
SimpleStompService.this.reactor.notify("destination:" + destination, Fn.event(message));
}
}
......@@ -119,7 +119,7 @@ public class SimpleStompReactorService {
@Override
public void accept(Event<String> event) {
String sessionId = event.getData();
SimpleStompReactorService.this.removeSubscriptions(sessionId);
SimpleStompService.this.removeSubscriptions(sessionId);
}
}
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.stomp.support;
package org.springframework.web.messaging.stomp.support;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
......@@ -21,10 +21,10 @@ import java.util.List;
import java.util.Map.Entry;
import org.springframework.util.Assert;
import org.springframework.web.stomp.StompCommand;
import org.springframework.web.stomp.StompException;
import org.springframework.web.stomp.StompHeaders;
import org.springframework.web.stomp.StompMessage;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompException;
import org.springframework.web.messaging.stomp.StompHeaders;
import org.springframework.web.messaging.stomp.StompMessage;
/**
* @author Gary Russell
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册