RxJava1ResponseExtractors.java 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
/*
 * Copyright 2002-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.web.client.reactive.support;

import java.util.List;
import java.util.Optional;

import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import org.springframework.web.client.reactive.ResponseExtractor;

import reactor.core.converter.RxJava1ObservableConverter;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;

/**
 * Static factory methods for {@link ResponseExtractor}
 * based on the {@link Observable} and {@link Single} API.
 *
 * @author Brian Clozel
 */
R
Rossen Stoyanchev 已提交
43
public class 	RxJava1ResponseExtractors {
44 45 46 47

	/**
	 * Extract the response body and decode it, returning it as a {@code Single<T>}
	 */
R
Rossen Stoyanchev 已提交
48
	@SuppressWarnings("unchecked")
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	public static <T> ResponseExtractor<Single<T>> body(Class<T> sourceClass) {

		ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
		return (clientResponse, messageConverters) -> (Single<T>) RxJava1SingleConverter
				.fromPublisher(clientResponse
						.flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)).next());
	}

	/**
	 * Extract the response body and decode it, returning it as an {@code Observable<T>}
	 */
	public static <T> ResponseExtractor<Observable<T>> bodyStream(Class<T> sourceClass) {

		ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
		return (clientResponse, messageConverters) -> RxJava1ObservableConverter
				.fromPublisher(clientResponse
						.flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)));
	}

	/**
	 * Extract the full response body as a {@code ResponseEntity}
	 * with its body decoded as a single type {@code T}
	 */
R
Rossen Stoyanchev 已提交
72
	@SuppressWarnings("unchecked")
73 74 75
	public static <T> ResponseExtractor<Single<ResponseEntity<T>>> response(Class<T> sourceClass) {

		ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
R
Rossen Stoyanchev 已提交
76
		return (clientResponse, messageConverters) ->
77 78 79 80 81 82
				RxJava1SingleConverter.fromPublisher(clientResponse
						.then(response ->
								Mono.when(
										decodeResponseBody(response, resolvableType, messageConverters).next(),
										Mono.just(response.getHeaders()),
										Mono.just(response.getStatusCode())))
R
Rossen Stoyanchev 已提交
83 84
						.map(tuple ->
								new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3())));
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	}

	/**
	 * Extract the full response body as a {@code ResponseEntity}
	 * with its body decoded as an {@code Observable<T>}
	 */
	public static <T> ResponseExtractor<Single<ResponseEntity<Observable<T>>>> responseStream(Class<T> sourceClass) {
		ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
		return (clientResponse, messageConverters) -> RxJava1SingleConverter.fromPublisher(clientResponse
				.map(response -> new ResponseEntity<>(
						RxJava1ObservableConverter
								.fromPublisher(decodeResponseBody(response, resolvableType, messageConverters)),
						response.getHeaders(),
						response.getStatusCode())));
	}

	/**
	 * Extract the response headers as an {@code HttpHeaders} instance
	 */
	public static ResponseExtractor<Single<HttpHeaders>> headers() {
		return (clientResponse, messageConverters) -> RxJava1SingleConverter
				.fromPublisher(clientResponse.map(resp -> resp.getHeaders()));
	}

R
Rossen Stoyanchev 已提交
109
	@SuppressWarnings("unchecked")
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
	protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
			List<HttpMessageConverter<?>> messageConverters) {

		MediaType contentType = response.getHeaders().getContentType();
		Optional<HttpMessageConverter<?>> converter = resolveConverter(messageConverters, responseType, contentType);
		if (!converter.isPresent()) {
			return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType +
					"' with target type '" + responseType.toString() + "'"));
		}
		return (Flux<T>) converter.get().read(responseType, response);
	}


	protected static Optional<HttpMessageConverter<?>> resolveConverter(List<HttpMessageConverter<?>> messageConverters,
			ResolvableType type, MediaType mediaType) {
		return messageConverters.stream().filter(e -> e.canRead(type, mediaType)).findFirst();
	}
}