BodyExtractors.java 12.1 KB
Newer Older
1
/*
S
Sam Brannen 已提交
2
 * Copyright 2002-2017 the original author or authors.
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.springframework.web.reactive.function;
18 19

import java.util.List;
20
import java.util.Optional;
21
import java.util.function.Function;
22
import java.util.function.Supplier;
23 24 25 26 27 28
import java.util.stream.Collectors;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

29
import org.springframework.core.ParameterizedTypeReference;
30
import org.springframework.core.ResolvableType;
31
import org.springframework.core.io.buffer.DataBuffer;
32
import org.springframework.http.HttpMessage;
33
import org.springframework.http.MediaType;
34
import org.springframework.http.ReactiveHttpInputMessage;
35
import org.springframework.http.codec.HttpMessageReader;
36
import org.springframework.http.codec.multipart.Part;
37
import org.springframework.http.server.reactive.ServerHttpRequest;
38
import org.springframework.http.server.reactive.ServerHttpResponse;
39
import org.springframework.util.Assert;
40
import org.springframework.util.MultiValueMap;
41 42 43 44 45

/**
 * Implementations of {@link BodyExtractor} that read various bodies, such a reactive streams.
 *
 * @author Arjen Poutsma
46
 * @author Sebastien Deleuze
47 48 49 50
 * @since 5.0
 */
public abstract class BodyExtractors {

51
	private static final ResolvableType FORM_MAP_TYPE =
52 53
			ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);

54
	private static final ResolvableType MULTIPART_MAP_TYPE = ResolvableType.forClassWithGenerics(
55 56
			MultiValueMap.class, String.class, Part.class);

57 58
	private static final ResolvableType PART_TYPE = ResolvableType.forClass(Part.class);

A
Arjen Poutsma 已提交
59
	private static final ResolvableType VOID_TYPE = ResolvableType.forClass(Void.class);
60

61 62 63 64 65 66
	/**
	 * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}.
	 * @param elementClass the class of element in the {@code Mono}
	 * @param <T> the element type
	 * @return a {@code BodyExtractor} that reads a mono
	 */
67
	public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(Class<? extends T> elementClass) {
68 69 70 71 72 73
		Assert.notNull(elementClass, "'elementClass' must not be null");
		return toMono(ResolvableType.forClass(elementClass));
	}

	/**
	 * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}.
74 75 76 77 78 79 80 81 82 83
	 * The given {@link ParameterizedTypeReference} is used to pass generic type information, for
	 * instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient}
	 * <pre class="code">
	 * Mono&lt;Map&lt;String, String&gt;&gt; body = this.webClient
	 *  .get()
	 *  .uri("http://example.com")
	 *  .exchange()
	 *  .flatMap(r -> r.body(toMono(new ParameterizedTypeReference&lt;Map&lt;String,String&gt;&gt;() {})));
	 * </pre>
	 * @param typeReference a reference to the type of element in the {@code Mono}
84 85 86
	 * @param <T> the element type
	 * @return a {@code BodyExtractor} that reads a mono
	 */
87 88 89 90 91 92
	public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ParameterizedTypeReference<T> typeReference) {
		Assert.notNull(typeReference, "'typeReference' must not be null");
		return toMono(ResolvableType.forType(typeReference.getType()));
	}

	static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
93
		Assert.notNull(elementType, "'elementType' must not be null");
94
		return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
95
				elementType,
96
				(HttpMessageReader<T> reader) -> {
97 98 99 100 101 102 103 104 105
					Optional<ServerHttpResponse> serverResponse = context.serverResponse();
					if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) {
						return reader.readMono(elementType, elementType, (ServerHttpRequest) inputMessage,
								serverResponse.get(), context.hints());
					}
					else {
						return reader.readMono(elementType, inputMessage, context.hints());
					}
				},
106 107
				ex -> (inputMessage.getHeaders().getContentType() == null) ?
						Mono.from(permitEmptyOrFail(inputMessage, ex)) : Mono.error(ex),
108
				Mono::empty);
109 110 111 112 113 114
	}

	/**
	 * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}.
	 * @param elementClass the class of element in the {@code Flux}
	 * @param <T> the element type
115
	 * @return a {@code BodyExtractor} that reads a flux
116
	 */
117
	public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) {
118 119 120 121 122 123
		Assert.notNull(elementClass, "'elementClass' must not be null");
		return toFlux(ResolvableType.forClass(elementClass));
	}

	/**
	 * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}.
124 125 126 127 128 129 130 131 132 133
	 * The given {@link ParameterizedTypeReference} is used to pass generic type information, for
	 * instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient}
	 * <pre class="code">
	 * Flux&lt;ServerSentEvent&lt;String&gt;&gt; body = this.webClient
	 *  .get()
	 *  .uri("http://example.com")
	 *  .exchange()
	 *  .flatMap(r -> r.body(toFlux(new ParameterizedTypeReference&lt;ServerSentEvent&lt;String&gt;&gt;() {})));
	 * </pre>
	 * @param typeReference a reference to the type of element in the {@code Flux}
134
	 * @param <T> the element type
135
	 * @return a {@code BodyExtractor} that reads a flux
136
	 */
137 138 139 140 141
	public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ParameterizedTypeReference<T> typeReference) {
		Assert.notNull(typeReference, "'typeReference' must not be null");
		return toFlux(ResolvableType.forType(typeReference.getType()));
	}

142
	@SuppressWarnings("unchecked")
143
	static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) {
144
		Assert.notNull(elementType, "'elementType' must not be null");
145
		return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
146
				elementType,
147
				(HttpMessageReader<T> reader) -> {
148 149 150 151 152 153 154 155 156
					Optional<ServerHttpResponse> serverResponse = context.serverResponse();
					if (serverResponse.isPresent() && inputMessage instanceof ServerHttpRequest) {
						return reader.read(elementType, elementType, (ServerHttpRequest) inputMessage,
								serverResponse.get(), context.hints());
					}
					else {
						return reader.read(elementType, inputMessage, context.hints());
					}
				},
157 158
				ex -> (inputMessage.getHeaders().getContentType() == null) ?
						permitEmptyOrFail(inputMessage, ex) : Flux.error(ex),
159
				Flux::empty);
160 161
	}

162 163 164 165 166 167 168
	@SuppressWarnings("unchecked")
	private static <T> Flux<T> permitEmptyOrFail(ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
		return message.getBody().doOnNext(buffer -> {
			throw ex;
		}).map(o -> (T) o);
	}

169 170 171 172
	/**
	 * Return a {@code BodyExtractor} that reads form data into a {@link MultiValueMap}.
	 * @return a {@code BodyExtractor} that reads form data
	 */
173 174 175 176
	// Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not
	// ReactiveHttpInputMessage like other methods, since reading form data only typically happens on
	// the server-side
	public static BodyExtractor<Mono<MultiValueMap<String, String>>, ServerHttpRequest> toFormData() {
177
		return (serverRequest, context) -> {
178
			HttpMessageReader<MultiValueMap<String, String>> messageReader =
179
					messageReader(FORM_MAP_TYPE, MediaType.APPLICATION_FORM_URLENCODED, context);
180
			return context.serverResponse()
181 182
					.map(serverResponse -> messageReader.readMono(FORM_MAP_TYPE, FORM_MAP_TYPE, serverRequest, serverResponse, context.hints()))
					.orElseGet(() -> messageReader.readMono(FORM_MAP_TYPE, serverRequest, context.hints()));
183
		};
184 185
	}

186
	/**
187 188
	 * Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a
	 * {@link MultiValueMap}.
189 190 191 192 193 194 195 196
	 * @return a {@code BodyExtractor} that reads multipart data
	 */
	// Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not
	// ReactiveHttpInputMessage like other methods, since reading form data only typically happens on
	// the server-side
	public static BodyExtractor<Mono<MultiValueMap<String, Part>>, ServerHttpRequest> toMultipartData() {
		return (serverRequest, context) -> {
			HttpMessageReader<MultiValueMap<String, Part>> messageReader =
197
					messageReader(MULTIPART_MAP_TYPE, MediaType.MULTIPART_FORM_DATA, context);
198
			return context.serverResponse()
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
					.map(serverResponse -> messageReader.readMono(MULTIPART_MAP_TYPE,
							MULTIPART_MAP_TYPE, serverRequest, serverResponse, context.hints()))
					.orElseGet(() -> messageReader.readMono(MULTIPART_MAP_TYPE, serverRequest, context.hints()));
		};
	}

	/**
	 * Return a {@code BodyExtractor} that reads multipart (i.e. file upload) form data into a
	 * {@link MultiValueMap}.
	 * @return a {@code BodyExtractor} that reads multipart data
	 */
	// Note that the returned BodyExtractor is parameterized to ServerHttpRequest, not
	// ReactiveHttpInputMessage like other methods, since reading form data only typically happens on
	// the server-side
	public static BodyExtractor<Flux<Part>, ServerHttpRequest> toParts() {
		return (serverRequest, context) -> {
			HttpMessageReader<Part> messageReader =
					messageReader(PART_TYPE, MediaType.MULTIPART_FORM_DATA, context);
			return context.serverResponse()
					.map(serverResponse -> messageReader.read(PART_TYPE, PART_TYPE, serverRequest, serverResponse, context.hints()))
					.orElseGet(() -> messageReader.read(PART_TYPE, serverRequest, context.hints()));
220 221 222
		};
	}

223 224 225 226 227 228 229 230 231 232 233 234
	/**
	 * Return a {@code BodyExtractor} that returns the body of the message as a {@link Flux} of
	 * {@link DataBuffer}s.
	 * <p><strong>Note</strong> that the returned buffers should be released after usage by calling
	 * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}
	 * @return a {@code BodyExtractor} that returns the body
	 * @see ReactiveHttpInputMessage#getBody()
	 */
	public static BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> toDataBuffers() {
		return (inputMessage, context) -> inputMessage.getBody();
	}

J
Juergen Hoeller 已提交
235

236
	private static <T, S extends Publisher<T>> S readWithMessageReaders(
J
Juergen Hoeller 已提交
237
			ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context, ResolvableType elementType,
238 239
			Function<HttpMessageReader<T>, S> readerFunction,
			Function<UnsupportedMediaTypeException, S> unsupportedError,
240
			Supplier<S> empty) {
241

A
Arjen Poutsma 已提交
242
		if (VOID_TYPE.equals(elementType)) {
243 244
			return empty.get();
		}
245
		MediaType contentType = contentType(inputMessage);
246 247
		List<HttpMessageReader<?>> messageReaders = context.messageReaders();
		return messageReaders.stream()
248
				.filter(r -> r.canRead(elementType, contentType))
249 250 251 252
				.findFirst()
				.map(BodyExtractors::<T>cast)
				.map(readerFunction)
				.orElseGet(() -> {
253
					List<MediaType> supportedMediaTypes = messageReaders.stream()
254 255
							.flatMap(reader -> reader.getReadableMediaTypes().stream())
							.collect(Collectors.toList());
256 257
					UnsupportedMediaTypeException error =
							new UnsupportedMediaTypeException(contentType, supportedMediaTypes);
258 259 260 261
					return unsupportedError.apply(error);
				});
	}

262 263
	private static <T> HttpMessageReader<T> messageReader(ResolvableType elementType,
			MediaType mediaType, BodyExtractor.Context context) {
264
		return context.messageReaders().stream()
265
				.filter(messageReader -> messageReader.canRead(elementType, mediaType))
J
Juergen Hoeller 已提交
266
				.findFirst()
267
				.map(BodyExtractors::<T>cast)
268
				.orElseThrow(() -> new IllegalStateException(
269 270
						"Could not find HttpMessageReader that supports \"" + mediaType +
								"\" and \"" + elementType + "\""));
271 272
	}

273 274
	private static MediaType contentType(HttpMessage message) {
		MediaType result = message.getHeaders().getContentType();
275 276 277 278 279 280 281 282 283
		return result != null ? result : MediaType.APPLICATION_OCTET_STREAM;
	}

	@SuppressWarnings("unchecked")
	private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> messageReader) {
		return (HttpMessageReader<T>) messageReader;
	}

}