TomcatWebSocketHandlerAdapter.java 4.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright 2002-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.web.reactive.socket.adapter;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.PongMessage;
import javax.websocket.Session;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
R
Polish  
Rossen Stoyanchev 已提交
29 30

import org.springframework.core.io.buffer.DataBuffer;
31 32
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
33 34 35 36 37 38 39 40 41 42 43 44
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;

/**
 * Tomcat {@code WebSocketHandler} implementation adapting and
 * delegating to a Spring {@link WebSocketHandler}.
 * 
 * @author Violeta Georgieva
 * @since 5.0
 */
45
public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport {
46

R
Polish  
Rossen Stoyanchev 已提交
47
	private TomcatWebSocketSession session;
48

49

50 51
	public TomcatWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
			WebSocketHandler delegate) {
R
Polish  
Rossen Stoyanchev 已提交
52

53
		super(request, response, delegate);
54 55
	}

R
Polish  
Rossen Stoyanchev 已提交
56

57 58
	public Endpoint getEndpoint() {
		return new StandardEndpoint();
59 60
	}

61 62 63 64 65 66 67 68 69
	private TomcatWebSocketSession getSession() {
		return this.session;
	}


	private class StandardEndpoint extends Endpoint {

		@Override
		public void onOpen(Session session, EndpointConfig config) {
70 71
			TomcatWebSocketHandlerAdapter.this.session =
					new TomcatWebSocketSession(session, getUri(), getBufferFactory());
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

			session.addMessageHandler(String.class, message -> {
				WebSocketMessage webSocketMessage = toMessage(message);
				getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
			});
			session.addMessageHandler(ByteBuffer.class, message -> {
				WebSocketMessage webSocketMessage = toMessage(message);
				getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
			});
			session.addMessageHandler(PongMessage.class, message -> {
				WebSocketMessage webSocketMessage = toMessage(message);
				getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
			});

			HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
			getDelegate().handle(TomcatWebSocketHandlerAdapter.this.session).subscribe(resultSubscriber);
88
		}
89 90 91 92

		private <T> WebSocketMessage toMessage(T message) {
			if (message instanceof String) {
				byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
93
				return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
94 95 96
			}
			else if (message instanceof ByteBuffer) {
				DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
97
				return new WebSocketMessage(Type.BINARY, buffer);
98 99 100
			}
			else if (message instanceof PongMessage) {
				DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData());
101
				return new WebSocketMessage(Type.PONG, buffer);
102 103 104 105
			}
			else {
				throw new IllegalArgumentException("Unexpected message type: " + message);
			}
106 107
		}

108 109 110 111 112 113
		@Override
		public void onClose(Session session, CloseReason reason) {
			if (getSession() != null) {
				int code = reason.getCloseCode().getCode();
				getSession().handleClose(new CloseStatus(code, reason.getReasonPhrase()));
			}
114 115
		}

116 117 118 119 120
		@Override
		public void onError(Session session, Throwable exception) {
			if (getSession() != null) {
				getSession().handleError(exception);
			}
121 122 123
		}
	}

124 125 126 127 128 129 130 131 132 133 134 135 136 137
	private final class HandlerResultSubscriber implements Subscriber<Void> {

		@Override
		public void onSubscribe(Subscription subscription) {
			subscription.request(Long.MAX_VALUE);
		}

		@Override
		public void onNext(Void aVoid) {
			// no op
		}

		@Override
		public void onError(Throwable ex) {
138 139
			if (getSession() != null) {
				getSession().close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
140 141 142 143 144
			}
		}

		@Override
		public void onComplete() {
145 146
			if (getSession() != null) {
				getSession().close();
147 148 149 150 151
			}
		}
	}

}