/* * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.springframework.http.client.reactive; import java.net.URI; import java.util.function.Function; import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Mono; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; import org.springframework.http.HttpMethod; import org.springframework.util.Assert; /** * Reactor-Netty implementation of {@link ClientHttpConnector}. * * @author Brian Clozel * @since 5.0 * @see reactor.netty.http.client.HttpClient */ public class ReactorClientHttpConnector implements ClientHttpConnector { private final static Function defaultInitializer = client -> client.compress(true); private final HttpClient httpClient; /** * Default constructor. Initializes {@link HttpClient} via: *
	 * HttpClient.create().compress()
	 * 
*/ public ReactorClientHttpConnector() { this.httpClient = defaultInitializer.apply(HttpClient.create()); } /** * Constructor with externally managed Reactor Netty resources, including * {@link LoopResources} for event loop threads, and {@link ConnectionProvider} * for the connection pool. *

This constructor should be used only when you don't want the client * to participate in the Reactor Netty global resources. By default the * client participates in the Reactor Netty global resources held in * {@link reactor.netty.http.HttpResources}, which is recommended since * fixed, shared resources are favored for event loop concurrency. However, * consider declaring a {@link ReactorResourceFactory} bean with * {@code globaResources=true} in order to ensure the Reactor Netty global * resources are shut down when the Spring ApplicationContext is closed. * @param factory the resource factory to obtain the resources from * @param mapper a mapper for further initialization of the created client * @since 5.1 */ public ReactorClientHttpConnector(ReactorResourceFactory factory, Function mapper) { this.httpClient = defaultInitializer.andThen(mapper).apply(initHttpClient(factory)); } private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) { ConnectionProvider provider = resourceFactory.getConnectionProvider(); LoopResources resources = resourceFactory.getLoopResources(); Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources)); } /** * Constructor with a pre-configured {@code HttpClient} instance. * @param httpClient the client to use * @since 5.1 */ public ReactorClientHttpConnector(HttpClient httpClient) { Assert.notNull(httpClient, "HttpClient is required"); this.httpClient = httpClient; } @Override public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { if (!uri.isAbsolute()) { return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); } return this.httpClient .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())) .uri(uri.toString()) .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) .next(); } private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound nettyOutbound) { return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); } private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound, ByteBufAllocator allocator) { return new ReactorClientHttpResponse(response, nettyInbound, allocator); } }