UndertowServerHttpRequest.java 9.2 KB
Newer Older
1
/*
2
 * Copyright 2002-2018 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.http.server.reactive;

19
import java.io.IOException;
20 21
import java.io.InputStream;
import java.io.OutputStream;
22
import java.net.InetSocketAddress;
23
import java.net.URI;
24
import java.nio.ByteBuffer;
25
import java.util.function.IntPredicate;
26
import javax.net.ssl.SSLSession;
27

28
import io.undertow.connector.ByteBufferPool;
29
import io.undertow.connector.PooledByteBuffer;
30
import io.undertow.server.HttpServerExchange;
31
import io.undertow.server.handlers.Cookie;
32
import io.undertow.util.HeaderValues;
33
import org.xnio.channels.StreamSourceChannel;
34
import reactor.core.publisher.Flux;
35

36
import org.springframework.core.io.buffer.DataBuffer;
37
import org.springframework.core.io.buffer.DataBufferFactory;
38
import org.springframework.core.io.buffer.DataBufferUtils;
39
import org.springframework.core.io.buffer.PooledDataBuffer;
40
import org.springframework.http.HttpCookie;
41
import org.springframework.http.HttpHeaders;
42
import org.springframework.lang.Nullable;
43
import org.springframework.util.Assert;
R
Rossen Stoyanchev 已提交
44
import org.springframework.util.LinkedMultiValueMap;
R
Rossen Stoyanchev 已提交
45
import org.springframework.util.MultiValueMap;
46
import org.springframework.util.StringUtils;
47 48

/**
49
 * Adapt {@link ServerHttpRequest} to the Undertow {@link HttpServerExchange}.
50
 *
51 52
 * @author Marek Hawrylczak
 * @author Rossen Stoyanchev
53
 * @since 5.0
54
 */
55
class UndertowServerHttpRequest extends AbstractServerHttpRequest {
56 57 58

	private final HttpServerExchange exchange;

59
	private final RequestBodyPublisher body;
60

61

62
	public UndertowServerHttpRequest(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
63
		super(initUri(exchange), "", initHeaders(exchange));
64
		this.exchange = exchange;
65 66
		this.body = new RequestBodyPublisher(exchange, bufferFactory);
		this.body.registerListeners(exchange);
67 68
	}

69
	private static URI initUri(HttpServerExchange exchange) {
70
		Assert.notNull(exchange, "HttpServerExchange is required.");
71
		String requestURL = exchange.getRequestURL();
72
		String query = exchange.getQueryString();
73 74
		String requestUriAndQuery = StringUtils.isEmpty(query) ? requestURL : requestURL + "?" + query;
		return URI.create(requestUriAndQuery);
75 76 77 78 79 80 81 82 83 84
	}

	private static HttpHeaders initHeaders(HttpServerExchange exchange) {
		HttpHeaders headers = new HttpHeaders();
		for (HeaderValues values : exchange.getRequestHeaders()) {
			headers.put(values.getHeaderName().toString(), values);
		}
		return headers;
	}

85
	@Override
A
Arjen Poutsma 已提交
86
	public String getMethodValue() {
87
		return this.exchange.getRequestMethod().toString();
88 89
	}

90
	@Override
R
Rossen Stoyanchev 已提交
91 92
	protected MultiValueMap<String, HttpCookie> initCookies() {
		MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
93 94
		for (String name : this.exchange.getRequestCookies().keySet()) {
			Cookie cookie = this.exchange.getRequestCookies().get(name);
R
Rossen Stoyanchev 已提交
95 96
			HttpCookie httpCookie = new HttpCookie(name, cookie.getValue());
			cookies.add(name, httpCookie);
97
		}
R
Rossen Stoyanchev 已提交
98
		return cookies;
99 100
	}

101
	@Override
102 103
	public InetSocketAddress getRemoteAddress() {
		return this.exchange.getSourceAddress();
104 105
	}

106 107 108 109 110 111 112 113 114 115
	@Nullable
	@Override
	protected SslInfo initSslInfo() {
		SSLSession session = this.exchange.getConnection().getSslSession();
		if (session != null) {
			return new DefaultSslInfo(session);
		}
		return null;
	}

116
	@Override
117
	public Flux<DataBuffer> getBody() {
118
		return Flux.from(this.body);
119 120
	}

121 122 123 124 125 126
	@SuppressWarnings("unchecked")
	@Override
	public <T> T getNativeRequest() {
		return (T) this.exchange;
	}

127

128
	private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
129

130
		private final StreamSourceChannel channel;
131

132
		private final DataBufferFactory bufferFactory;
133

134 135
		private final ByteBufferPool byteBufferPool;

136 137 138
		public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
			this.channel = exchange.getRequestChannel();
			this.bufferFactory = bufferFactory;
139
			this.byteBufferPool = exchange.getConnection().getByteBufferPool();
140 141
		}

142
		private void registerListeners(HttpServerExchange exchange) {
143 144 145 146
			exchange.addExchangeCompleteListener((ex, next) -> {
				onAllDataRead();
				next.proceed();
			});
147 148
			this.channel.getReadSetter().set(c -> onDataAvailable());
			this.channel.getCloseSetter().set(c -> onAllDataRead());
149
			this.channel.resumeReads();
150 151 152 153
		}

		@Override
		protected void checkOnDataAvailable() {
154 155 156
			this.channel.resumeReads();
			// We are allowed to try, it will return null if data is not available
			onDataAvailable();
157 158 159
		}

		@Override
160
		protected void readingPaused() {
161 162 163
			this.channel.suspendReads();
		}

164
		@Override
165
		@Nullable
166
		protected DataBuffer read() throws IOException {
167 168 169 170
			PooledByteBuffer pooledByteBuffer = this.byteBufferPool.allocate();
			boolean release = true;
			try {
				ByteBuffer byteBuffer = pooledByteBuffer.getBuffer();
171

172 173
				int read = this.channel.read(byteBuffer);
				if (logger.isTraceEnabled()) {
R
Rossen Stoyanchev 已提交
174
					logger.trace("Channel read returned " + read + (read != -1 ? " bytes" : ""));
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
				}

				if (read > 0) {
					byteBuffer.flip();
					DataBuffer dataBuffer = this.bufferFactory.wrap(byteBuffer);
					release = false;
					return new UndertowDataBuffer(dataBuffer, pooledByteBuffer);
				}
				else if (read == -1) {
					onAllDataRead();
				}
				return null;
			} finally {
				if (release && pooledByteBuffer.isOpen()) {
					pooledByteBuffer.close();
				}
191
			}
192 193 194 195 196 197 198 199 200 201 202 203 204
		}

	}

	private static class UndertowDataBuffer implements PooledDataBuffer {

		private final DataBuffer dataBuffer;

		private final PooledByteBuffer pooledByteBuffer;

		public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer) {
			this.dataBuffer = dataBuffer;
			this.pooledByteBuffer = pooledByteBuffer;
205 206
		}

207
		@Override
208 209 210 211 212 213
		public PooledDataBuffer retain() {
			return this;
		}

		@Override
		public boolean release() {
214 215 216 217 218 219 220 221
			boolean result;
			try {
				result = DataBufferUtils.release(this.dataBuffer);
			}
			finally {
				this.pooledByteBuffer.close();
			}
			return result && this.pooledByteBuffer.isOpen();
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
		}

		@Override
		public DataBufferFactory factory() {
			return this.dataBuffer.factory();
		}

		@Override
		public int indexOf(IntPredicate predicate, int fromIndex) {
			return this.dataBuffer.indexOf(predicate, fromIndex);
		}

		@Override
		public int lastIndexOf(IntPredicate predicate, int fromIndex) {
			return this.dataBuffer.lastIndexOf(predicate, fromIndex);
		}

		@Override
		public int readableByteCount() {
			return this.dataBuffer.readableByteCount();
		}

		@Override
		public int writableByteCount() {
			return this.dataBuffer.writableByteCount();
		}

		@Override
		public int readPosition() {
			return this.dataBuffer.readPosition();
		}

		@Override
		public DataBuffer readPosition(int readPosition) {
			return this.dataBuffer.readPosition(readPosition);
		}

		@Override
		public int writePosition() {
			return this.dataBuffer.writePosition();
		}

		@Override
		public DataBuffer writePosition(int writePosition) {
			return this.dataBuffer.writePosition(writePosition);
		}

		@Override
		public int capacity() {
			return this.dataBuffer.capacity();
		}

		@Override
		public DataBuffer capacity(int newCapacity) {
			return this.dataBuffer.capacity(newCapacity);
		}

279 280 281 282 283
		@Override
		public byte getByte(int index) {
			return this.dataBuffer.getByte(index);
		}

284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
		@Override
		public byte read() {
			return this.dataBuffer.read();
		}

		@Override
		public DataBuffer read(byte[] destination) {
			return this.dataBuffer.read(destination);
		}

		@Override
		public DataBuffer read(byte[] destination, int offset,
				int length) {
			return this.dataBuffer.read(destination, offset, length);
		}

		@Override
		public DataBuffer write(byte b) {
			return this.dataBuffer.write(b);
		}

		@Override
		public DataBuffer write(byte[] source) {
			return this.dataBuffer.write(source);
		}

		@Override
		public DataBuffer write(byte[] source, int offset,
				int length) {
			return this.dataBuffer.write(source, offset, length);
		}

		@Override
		public DataBuffer write(
				DataBuffer... buffers) {
			return this.dataBuffer.write(buffers);
		}

		@Override
		public DataBuffer write(
				ByteBuffer... byteBuffers) {
			return this.dataBuffer.write(byteBuffers);
		}

		@Override
		public DataBuffer slice(int index, int length) {
			return this.dataBuffer.slice(index, length);
		}

		@Override
		public ByteBuffer asByteBuffer() {
			return this.dataBuffer.asByteBuffer();
		}

		@Override
		public ByteBuffer asByteBuffer(int index, int length) {
			return this.dataBuffer.asByteBuffer(index, length);
		}

		@Override
		public InputStream asInputStream() {
			return this.dataBuffer.asInputStream();
		}

348 349 350 351 352
		@Override
		public InputStream asInputStream(boolean releaseOnClose) {
			return this.dataBuffer.asInputStream(releaseOnClose);
		}

353 354 355
		@Override
		public OutputStream asOutputStream() {
			return this.dataBuffer.asOutputStream();
356
		}
357
	}
358
}