AbstractHttpSockJsSession.java 10.4 KB
Newer Older
1
/*
2
 * Copyright 2002-2014 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
16

17
package org.springframework.web.socket.sockjs.transport.session;
18 19

import java.io.IOException;
R
Rossen Stoyanchev 已提交
20
import java.net.InetSocketAddress;
R
Rossen Stoyanchev 已提交
21
import java.net.URI;
R
Rossen Stoyanchev 已提交
22
import java.security.Principal;
23
import java.util.Collections;
24
import java.util.List;
R
Rossen Stoyanchev 已提交
25
import java.util.Map;
26 27
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
28

R
Rossen Stoyanchev 已提交
29
import org.springframework.http.HttpHeaders;
30
import org.springframework.http.server.ServerHttpAsyncRequestControl;
31 32 33
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.util.Assert;
R
Rossen Stoyanchev 已提交
34
import org.springframework.web.socket.CloseStatus;
35
import org.springframework.web.socket.WebSocketExtension;
R
Rossen Stoyanchev 已提交
36
import org.springframework.web.socket.WebSocketHandler;
37 38
import org.springframework.web.socket.sockjs.SockJsException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
39 40 41
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
42 43

/**
R
Rossen Stoyanchev 已提交
44
 * An abstract base class for use with HTTP transport based SockJS sessions.
45 46 47 48
 *
 * @author Rossen Stoyanchev
 * @since 4.0
 */
49
public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
50

51
	private final Queue<String> messageCache;
52 53


54
	private volatile ServerHttpResponse response;
55

56
	private volatile ServerHttpAsyncRequestControl asyncRequestControl;
57

58
	private volatile SockJsFrameFormat frameFormat;
59

60
	private volatile boolean requestInitialized;
61

62

63
	private volatile URI uri;
R
Rossen Stoyanchev 已提交
64

65
	private volatile HttpHeaders handshakeHeaders;
R
Rossen Stoyanchev 已提交
66

67
	private volatile Principal principal;
R
Rossen Stoyanchev 已提交
68

69 70 71 72 73
	private volatile InetSocketAddress localAddress;

	private volatile InetSocketAddress remoteAddress;

	private volatile String acceptedProtocol;
R
Rossen Stoyanchev 已提交
74

75

R
Rossen Stoyanchev 已提交
76
	public AbstractHttpSockJsSession(String id, SockJsServiceConfig config,
77
			WebSocketHandler wsHandler, Map<String, Object> attributes) {
R
Rossen Stoyanchev 已提交
78

79
		super(id, config, wsHandler, attributes);
80
		this.messageCache = new LinkedBlockingQueue<String>(config.getHttpMessageCacheSize());
R
Rossen Stoyanchev 已提交
81 82 83
	}


R
Rossen Stoyanchev 已提交
84 85 86 87 88
	@Override
	public URI getUri() {
		return this.uri;
	}

R
Rossen Stoyanchev 已提交
89 90 91 92 93 94 95 96
	@Override
	public HttpHeaders getHandshakeHeaders() {
		return this.handshakeHeaders;
	}

	@Override
	public Principal getPrincipal() {
		return this.principal;
97 98
	}

R
Rossen Stoyanchev 已提交
99 100 101 102 103 104 105 106 107 108
	@Override
	public InetSocketAddress getLocalAddress() {
		return this.localAddress;
	}

	@Override
	public InetSocketAddress getRemoteAddress() {
		return this.remoteAddress;
	}

109 110 111 112 113 114 115
	/**
	 * Unlike WebSocket where sub-protocol negotiation is part of the
	 * initial handshake, in HTTP transports the same negotiation must
	 * be emulated and the selected protocol set through this setter.
	 * @param protocol the sub-protocol to set
	 */
	public void setAcceptedProtocol(String protocol) {
R
Rossen Stoyanchev 已提交
116
		this.acceptedProtocol = protocol;
117 118 119 120 121 122
	}

	/**
	 * Return the selected sub-protocol to use.
	 */
	public String getAcceptedProtocol() {
R
Rossen Stoyanchev 已提交
123
		return this.acceptedProtocol;
124 125
	}

126 127 128 129 130 131
	@Override
	public List<WebSocketExtension> getExtensions() {
		return Collections.emptyList();
	}


132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	/**
	 * Handle the first HTTP request, i.e. the one that starts a SockJS session.
	 * Write a prelude to the response (if needed), send the SockJS "open" frame
	 * to indicate to the client the session is opened, and invoke the
	 * delegate WebSocketHandler to provide it with the newly opened session.
	 * <p>
	 * The "xhr" and "jsonp" (polling-based) transports completes the initial request
	 * as soon as the open frame is sent. Following that the client should start a
	 * successive polling request within the same SockJS session.
	 * <p>
	 * The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
	 * based and will leave the initial request open in order to stream one or
	 * more messages. However, even streaming based transports eventually recycle
	 * the long running request, after a certain number of bytes have been streamed
	 * (128K by default), and allow the client to start a successive request within
	 * the same SockJS session.
	 *
	 * @param request the current request
	 * @param response the current response
	 * @param frameFormat the transport-specific SocksJS frame format to use
	 *
	 * @see #handleSuccessiveRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
	 */
155
	public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
156
			SockJsFrameFormat frameFormat) throws SockJsException {
157

158 159 160 161 162 163 164 165
		initRequest(request, response, frameFormat);

		this.uri = request.getURI();
		this.handshakeHeaders = request.getHeaders();
		this.principal = request.getPrincipal();
		this.localAddress = request.getLocalAddress();
		this.remoteAddress = request.getRemoteAddress();

166
		try {
167
			writePrelude(request, response);
168 169
			writeFrame(SockJsFrame.openFrame());
		}
170 171 172
		catch (Throwable ex) {
			tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
			throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
173
		}
R
Rossen Stoyanchev 已提交
174

R
Rossen Stoyanchev 已提交
175
		try {
176
			this.requestInitialized = true;
R
Rossen Stoyanchev 已提交
177 178
			delegateConnectionEstablished();
		}
179 180
		catch (Throwable ex) {
			throw new SockJsException("Unhandled exception from WebSocketHandler", getId(), ex);
R
Rossen Stoyanchev 已提交
181
		}
182 183
	}

184 185 186 187 188 189 190 191 192
	private void initRequest(ServerHttpRequest request, ServerHttpResponse response,
			SockJsFrameFormat frameFormat) {

		Assert.notNull(request, "Request must not be null");
		Assert.notNull(response, "Response must not be null");
		Assert.notNull(frameFormat, "SockJsFrameFormat must not be null");

		this.response = response;
		this.frameFormat = frameFormat;
193
		this.asyncRequestControl = request.getAsyncRequestControl(response);
194 195
	}

196
	protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
197 198
	}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
	/**
	 * Handle all HTTP requests part of the same SockJS session except for the very
	 * first, initial request. Write a prelude (if needed) and keep the request
	 * open and ready to send a message from the server to the client.
	 * <p>
	 * The "xhr" and "jsonp" (polling-based) transports completes the request when
	 * the next message is sent, which could be an array of messages cached during
	 * the time between successive requests, or it could be a heartbeat message
	 * sent if no other messages were sent (by default within 25 seconds).
	 * <p>
	 * The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
	 * based and will leave the request open longer in order to stream messages over
	 * a period of time. However, even streaming based transports eventually recycle
	 * the long running request, after a certain number of bytes have been streamed
	 * (128K by default), and allow the client to start a successive request within
	 * the same SockJS session.
	 *
	 * @param request the current request
	 * @param response the current response
	 * @param frameFormat the transport-specific SocksJS frame format to use
	 *
	 * @see #handleInitialRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
	 */
222
	public void handleSuccessiveRequest(ServerHttpRequest request,
223
			ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException {
224

225 226
		initRequest(request, response, frameFormat);
		try {
227
			writePrelude(request, response);
228 229 230 231 232 233 234 235 236
		}
		catch (Throwable ex) {
			tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
			throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
		}
		startAsyncRequest();
	}

	protected void startAsyncRequest() throws SockJsException {
237
		try {
238
			this.asyncRequestControl.start(-1);
239
			this.requestInitialized = true;
240 241 242
			scheduleHeartbeat();
			tryFlushCache();
		}
243 244 245
		catch (Throwable ex) {
			tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
			throw new SockJsTransportFailureException("Failed to flush messages", getId(), ex);
246
		}
247
	}
248

249
	@Override
250 251 252
	public boolean isActive() {
		ServerHttpAsyncRequestControl control = this.asyncRequestControl;
		return (control != null && !control.isCompleted());
253 254
	}

255
	protected Queue<String> getMessageCache() {
256 257 258
		return this.messageCache;
	}

259 260 261 262
	protected ServerHttpResponse getResponse() {
		return this.response;
	}

263
	@Override
264
	protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
265
		this.messageCache.add(message);
266
		tryFlushCache();
267 268
	}

269
	private void tryFlushCache() throws SockJsTransportFailureException {
270 271 272 273 274 275 276
		if (this.messageCache.isEmpty()) {
			logger.trace("Nothing to flush");
			return;
		}
		if (logger.isTraceEnabled()) {
			logger.trace(this.messageCache.size() + " message(s) to flush");
		}
277
		if (isActive() && this.requestInitialized) {
278
			logger.trace("Flushing messages");
279
			flushCache();
280
		}
281 282 283 284 285
		else {
			if (logger.isTraceEnabled()) {
				logger.trace("Not ready to flush");
			}
		}
286 287 288 289 290
	}

	/**
	 * Only called if the connection is currently active
	 */
291
	protected abstract void flushCache() throws SockJsTransportFailureException;
292

293
	@Override
294
	protected void disconnect(CloseStatus status) {
295 296 297
		resetRequest();
	}

298 299 300
	protected void resetRequest() {

		this.requestInitialized = false;
301
		updateLastActiveTime();
302 303 304 305 306 307 308 309 310 311 312

		if (isActive()) {
			ServerHttpAsyncRequestControl control = this.asyncRequestControl;
			if (control.isStarted()) {
				try {
					logger.debug("Completing asynchronous request");
					control.complete();
				}
				catch (Throwable ex) {
					logger.error("Failed to complete request: " + ex.getMessage());
				}
313 314
			}
		}
315

316
		this.response = null;
317
		this.asyncRequestControl = null;
318 319
	}

320
	@Override
321
	protected void writeFrameInternal(SockJsFrame frame) throws IOException {
322
		if (isActive()) {
323 324 325 326
			frame = this.frameFormat.format(frame);
			if (logger.isTraceEnabled()) {
				logger.trace("Writing " + frame);
			}
327
			getResponse().getBody().write(frame.getContentBytes());
328 329 330 331
		}
	}

}