AbstractHttpSockJsSession.java 10.8 KB
Newer Older
1
/*
2
 * Copyright 2002-2014 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
16

17
package org.springframework.web.socket.sockjs.transport.session;
18 19

import java.io.IOException;
R
Rossen Stoyanchev 已提交
20
import java.net.InetSocketAddress;
R
Rossen Stoyanchev 已提交
21
import java.net.URI;
R
Rossen Stoyanchev 已提交
22
import java.security.Principal;
23
import java.util.Collections;
24
import java.util.List;
R
Rossen Stoyanchev 已提交
25
import java.util.Map;
26 27
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
28

R
Rossen Stoyanchev 已提交
29
import org.springframework.http.HttpHeaders;
30
import org.springframework.http.server.ServerHttpAsyncRequestControl;
31 32 33
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.util.Assert;
R
Rossen Stoyanchev 已提交
34
import org.springframework.web.socket.CloseStatus;
35
import org.springframework.web.socket.WebSocketExtension;
R
Rossen Stoyanchev 已提交
36
import org.springframework.web.socket.WebSocketHandler;
37 38
import org.springframework.web.socket.sockjs.SockJsException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
39 40 41
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
42 43

/**
R
Rossen Stoyanchev 已提交
44
 * An abstract base class for use with HTTP transport based SockJS sessions.
45 46 47 48
 *
 * @author Rossen Stoyanchev
 * @since 4.0
 */
49
public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
50 51


52
	private volatile URI uri;
R
Rossen Stoyanchev 已提交
53

54
	private volatile HttpHeaders handshakeHeaders;
R
Rossen Stoyanchev 已提交
55

56
	private volatile Principal principal;
R
Rossen Stoyanchev 已提交
57

58 59 60 61 62
	private volatile InetSocketAddress localAddress;

	private volatile InetSocketAddress remoteAddress;

	private volatile String acceptedProtocol;
R
Rossen Stoyanchev 已提交
63

64

R
Polish  
Rossen Stoyanchev 已提交
65 66 67 68 69 70 71 72 73 74 75 76
	private volatile ServerHttpResponse response;

	private volatile ServerHttpAsyncRequestControl asyncRequestControl;

	private volatile SockJsFrameFormat frameFormat;

	private volatile boolean requestInitialized;


	private final Queue<String> messageCache;


R
Rossen Stoyanchev 已提交
77
	public AbstractHttpSockJsSession(String id, SockJsServiceConfig config,
78
			WebSocketHandler wsHandler, Map<String, Object> attributes) {
R
Rossen Stoyanchev 已提交
79

80
		super(id, config, wsHandler, attributes);
81
		this.messageCache = new LinkedBlockingQueue<String>(config.getHttpMessageCacheSize());
R
Rossen Stoyanchev 已提交
82 83 84
	}


R
Rossen Stoyanchev 已提交
85 86 87 88 89
	@Override
	public URI getUri() {
		return this.uri;
	}

R
Rossen Stoyanchev 已提交
90 91 92 93 94 95 96 97
	@Override
	public HttpHeaders getHandshakeHeaders() {
		return this.handshakeHeaders;
	}

	@Override
	public Principal getPrincipal() {
		return this.principal;
98 99
	}

R
Rossen Stoyanchev 已提交
100 101 102 103 104 105 106 107 108 109
	@Override
	public InetSocketAddress getLocalAddress() {
		return this.localAddress;
	}

	@Override
	public InetSocketAddress getRemoteAddress() {
		return this.remoteAddress;
	}

110
	/**
R
Polish  
Rossen Stoyanchev 已提交
111 112 113
	 * Unlike WebSocket where sub-protocol negotiation is part of the initial
	 * handshake, in HTTP transports the same negotiation must be emulated and
	 * the selected protocol set through this setter.
114 115 116
	 * @param protocol the sub-protocol to set
	 */
	public void setAcceptedProtocol(String protocol) {
R
Rossen Stoyanchev 已提交
117
		this.acceptedProtocol = protocol;
118 119 120 121 122 123
	}

	/**
	 * Return the selected sub-protocol to use.
	 */
	public String getAcceptedProtocol() {
R
Rossen Stoyanchev 已提交
124
		return this.acceptedProtocol;
125 126
	}

R
Polish  
Rossen Stoyanchev 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
	/**
	 * Return response for the current request, or {@code null} if between requests.
	 */
	protected ServerHttpResponse getResponse() {
		return this.response;
	}

	/**
	 * Return the SockJS buffer for messages stored transparently between polling
	 * requests. If the polling request takes longer than 5 seconds, the session
	 * will be closed.
	 *
	 * @see org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService
	 */
	protected Queue<String> getMessageCache() {
		return this.messageCache;
	}

	@Override
	public boolean isActive() {
		ServerHttpAsyncRequestControl control = this.asyncRequestControl;
		return (control != null && !control.isCompleted());
	}

151 152 153 154 155 156
	@Override
	public List<WebSocketExtension> getExtensions() {
		return Collections.emptyList();
	}


157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
	/**
	 * Handle the first HTTP request, i.e. the one that starts a SockJS session.
	 * Write a prelude to the response (if needed), send the SockJS "open" frame
	 * to indicate to the client the session is opened, and invoke the
	 * delegate WebSocketHandler to provide it with the newly opened session.
	 * <p>
	 * The "xhr" and "jsonp" (polling-based) transports completes the initial request
	 * as soon as the open frame is sent. Following that the client should start a
	 * successive polling request within the same SockJS session.
	 * <p>
	 * The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
	 * based and will leave the initial request open in order to stream one or
	 * more messages. However, even streaming based transports eventually recycle
	 * the long running request, after a certain number of bytes have been streamed
	 * (128K by default), and allow the client to start a successive request within
	 * the same SockJS session.
	 *
	 * @param request the current request
	 * @param response the current response
	 * @param frameFormat the transport-specific SocksJS frame format to use
	 *
	 * @see #handleSuccessiveRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
	 */
180
	public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
181
			SockJsFrameFormat frameFormat) throws SockJsException {
182

183 184 185 186 187 188 189 190
		initRequest(request, response, frameFormat);

		this.uri = request.getURI();
		this.handshakeHeaders = request.getHeaders();
		this.principal = request.getPrincipal();
		this.localAddress = request.getLocalAddress();
		this.remoteAddress = request.getRemoteAddress();

191
		try {
192
			writePrelude(request, response);
193 194
			writeFrame(SockJsFrame.openFrame());
		}
195 196 197
		catch (Throwable ex) {
			tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
			throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
198
		}
R
Rossen Stoyanchev 已提交
199

R
Rossen Stoyanchev 已提交
200
		try {
201
			this.requestInitialized = true;
R
Rossen Stoyanchev 已提交
202 203
			delegateConnectionEstablished();
		}
204 205
		catch (Throwable ex) {
			throw new SockJsException("Unhandled exception from WebSocketHandler", getId(), ex);
R
Rossen Stoyanchev 已提交
206
		}
207 208
	}

209 210 211 212 213 214 215 216 217
	private void initRequest(ServerHttpRequest request, ServerHttpResponse response,
			SockJsFrameFormat frameFormat) {

		Assert.notNull(request, "Request must not be null");
		Assert.notNull(response, "Response must not be null");
		Assert.notNull(frameFormat, "SockJsFrameFormat must not be null");

		this.response = response;
		this.frameFormat = frameFormat;
218
		this.asyncRequestControl = request.getAsyncRequestControl(response);
219 220
	}

221
	protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
222 223
	}

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
	/**
	 * Handle all HTTP requests part of the same SockJS session except for the very
	 * first, initial request. Write a prelude (if needed) and keep the request
	 * open and ready to send a message from the server to the client.
	 * <p>
	 * The "xhr" and "jsonp" (polling-based) transports completes the request when
	 * the next message is sent, which could be an array of messages cached during
	 * the time between successive requests, or it could be a heartbeat message
	 * sent if no other messages were sent (by default within 25 seconds).
	 * <p>
	 * The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
	 * based and will leave the request open longer in order to stream messages over
	 * a period of time. However, even streaming based transports eventually recycle
	 * the long running request, after a certain number of bytes have been streamed
	 * (128K by default), and allow the client to start a successive request within
	 * the same SockJS session.
	 *
	 * @param request the current request
	 * @param response the current response
	 * @param frameFormat the transport-specific SocksJS frame format to use
	 *
	 * @see #handleInitialRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
	 */
247
	public void handleSuccessiveRequest(ServerHttpRequest request,
248
			ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException {
249

250 251
		initRequest(request, response, frameFormat);
		try {
252
			writePrelude(request, response);
253 254 255 256 257 258 259 260 261
		}
		catch (Throwable ex) {
			tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
			throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
		}
		startAsyncRequest();
	}

	protected void startAsyncRequest() throws SockJsException {
262
		try {
263
			this.asyncRequestControl.start(-1);
264
			this.requestInitialized = true;
265 266 267
			scheduleHeartbeat();
			tryFlushCache();
		}
268 269 270
		catch (Throwable ex) {
			tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
			throw new SockJsTransportFailureException("Failed to flush messages", getId(), ex);
271
		}
272
	}
273

274
	@Override
275
	protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
276
		this.messageCache.add(message);
277
		tryFlushCache();
278 279
	}

280
	private void tryFlushCache() throws SockJsTransportFailureException {
281 282 283 284 285 286 287
		if (this.messageCache.isEmpty()) {
			logger.trace("Nothing to flush");
			return;
		}
		if (logger.isTraceEnabled()) {
			logger.trace(this.messageCache.size() + " message(s) to flush");
		}
288
		if (isActive() && this.requestInitialized) {
289
			logger.trace("Flushing messages");
290
			flushCache();
291
		}
292 293 294 295 296
		else {
			if (logger.isTraceEnabled()) {
				logger.trace("Not ready to flush");
			}
		}
297 298 299 300 301
	}

	/**
	 * Only called if the connection is currently active
	 */
302
	protected abstract void flushCache() throws SockJsTransportFailureException;
303

304
	@Override
305
	protected void disconnect(CloseStatus status) {
306 307 308
		resetRequest();
	}

309 310 311
	protected void resetRequest() {

		this.requestInitialized = false;
312
		updateLastActiveTime();
313 314 315 316 317 318 319 320 321 322 323

		if (isActive()) {
			ServerHttpAsyncRequestControl control = this.asyncRequestControl;
			if (control.isStarted()) {
				try {
					logger.debug("Completing asynchronous request");
					control.complete();
				}
				catch (Throwable ex) {
					logger.error("Failed to complete request: " + ex.getMessage());
				}
324 325
			}
		}
326

327
		this.response = null;
328
		this.asyncRequestControl = null;
329 330
	}

331
	@Override
332
	protected void writeFrameInternal(SockJsFrame frame) throws IOException {
333
		if (isActive()) {
334 335 336 337
			frame = this.frameFormat.format(frame);
			if (logger.isTraceEnabled()) {
				logger.trace("Writing " + frame);
			}
338
			getResponse().getBody().write(frame.getContentBytes());
339 340 341 342
		}
	}

}