提交 c98bb7ff 编写于 作者: A Allen Wang

upgrade to new RxNetty

上级 1cdb043f
......@@ -82,7 +82,7 @@ project(':ribbon-rxnetty') {
compile project(':ribbon-http')
compile project(':ribbon-loadbalancer')
compile project(':ribbon-core')
compile 'com.netflix.rxnetty:rx-netty:0.2.4'
compile 'com.netflix.rxnetty:rx-netty:0.2.6'
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
testCompile project(':ribbon-test')
}
......
......@@ -36,14 +36,18 @@ import io.netty.util.ReferenceCounted;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.HttpClient;
import io.reactivex.netty.protocol.http.HttpClientBuilder;
import io.reactivex.netty.protocol.http.ObservableHttpResponse;
import io.reactivex.netty.protocol.text.sse.SSEEvent;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientBuilder;
import io.reactivex.netty.protocol.http.client.HttpRequest;
import io.reactivex.netty.protocol.http.client.HttpResponse;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
......@@ -62,8 +66,7 @@ import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.http.HttpRequest;
import com.netflix.client.http.HttpResponse;
//import com.netflix.client.http.HttpResponse;
import com.netflix.client.http.UnexpectedHttpResponseException;
import com.netflix.serialization.Deserializer;
import com.netflix.serialization.HttpSerializationContext;
......@@ -157,9 +160,9 @@ public class NettyHttpClient implements Closeable {
pipeline.addLast(FullHttpResponseHandler.NAME, new FullHttpResponseHandler<T>(serializationFactory, request, typeDef, requestConfig));
}
}
private FullHttpRequest getHttpRequest(HttpRequest request, IClientConfig requestConfig) throws ClientException {
FullHttpRequest r = null;
/*
private HttpRequest<ByteBuf> getHttpRequest(com.netflix.client.http.HttpRequest request, IClientConfig requestConfig) throws ClientException {
HttpRequest<ByteBuf> r = null;
Object entity = request.getEntity();
String uri = request.getUri().toString();
if (request.getQueryParams() != null) {
......@@ -198,6 +201,7 @@ public class NettyHttpClient implements Closeable {
buf = Unpooled.wrappedBuffer(content);
contentLength = content.length;
}
r = HttpRequest
r = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(request.getVerb().name()), uri, buf);
if (contentLength >= 0) {
r.headers().set(HttpHeaders.Names.CONTENT_LENGTH, contentLength);
......@@ -215,7 +219,7 @@ public class NettyHttpClient implements Closeable {
}
return r;
}
*/
public IClientConfig getConfig() {
return config;
}
......@@ -224,7 +228,7 @@ public class NettyHttpClient implements Closeable {
return serializationFactory;
}
protected <S> S getProperty(IClientConfigKey<S> key, HttpRequest request, @Nullable IClientConfig requestConfig) {
protected <S> S getProperty(IClientConfigKey<S> key, com.netflix.client.http.HttpRequest request, @Nullable IClientConfig requestConfig) {
if (requestConfig != null && requestConfig.getPropertyWithType(key) != null) {
return requestConfig.getPropertyWithType(key);
} else if (request.getOverrideConfig() != null && request.getOverrideConfig().getPropertyWithType(key) != null) {
......@@ -233,7 +237,15 @@ public class NettyHttpClient implements Closeable {
return config.getPropertyWithType(key);
}
}
protected <S> S getProperty(IClientConfigKey<S> key, @Nullable IClientConfig requestConfig) {
if (requestConfig != null && requestConfig.getPropertyWithType(key) != null) {
return requestConfig.getPropertyWithType(key);
} else {
return config.getPropertyWithType(key);
}
}
/**
* Create an Observable for entities parsed from raw Server-Sent-Event using the default {@link SerializationFactory} and
* client's default configuration.
......@@ -279,7 +291,7 @@ public class NettyHttpClient implements Closeable {
*/
public <T> Observable<ServerSentEvent<T>> createServerSentEventEntityObservable(final HttpRequest request,
final TypeDef<T> typeDef, @Nullable final IClientConfig requestConfig) {
Observable<ObservableHttpResponse<SSEEvent>> observable = createServerSentEventObservable(request, requestConfig);
Observable<HttpResponse<SSEEvent>> observable = createServerSentEventObservable(request, requestConfig);
return observable.flatMap(new Func1<ObservableHttpResponse<SSEEvent>, Observable<ServerSentEvent<T>>>() {
@Override
......@@ -418,26 +430,28 @@ public class NettyHttpClient implements Closeable {
* Create an Observable of ObservableHttpResponse. Use this API if you need to manipulate the Netty pipeline
* and produce custom observable content.
*/
public <T> Observable<ObservableHttpResponse<T>> createObservableHttpResponse(final HttpRequest request,
final PipelineConfigurator<T, FullHttpRequest> protocolHandler, @Nullable final IClientConfig requestConfig,
public <I,O> Observable<HttpResponse<O>> createObservableHttpResponse(final HttpRequest<I> request,
final PipelineConfigurator<HttpResponse<O>, HttpRequest<I>> configurator, @Nullable final IClientConfig requestConfig,
final RxClient.ClientConfig rxClientConfig) {
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(protocolHandler);
Preconditions.checkNotNull(configurator);
Preconditions.checkNotNull(rxClientConfig);
HttpClientBuilder<FullHttpRequest, T> clientBuilder =
new HttpClientBuilder<FullHttpRequest, T>(request.getUri().getHost(), request.getUri().getPort());
int requestConnectTimeout = getProperty(IClientConfigKey.CommonKeys.ConnectTimeout, request, requestConfig);
HttpClient<FullHttpRequest, T> client = clientBuilder.channelOption(
ChannelOption.CONNECT_TIMEOUT_MILLIS, requestConnectTimeout).eventloop(getNextEventGroupLoop())
.pipelineConfigurator(protocolHandler).config(rxClientConfig).build();
FullHttpRequest r = null;
URI uri = null;
try {
r = getHttpRequest(request, requestConfig);
} catch (final Exception e) {
uri = new URI(request.getUri());
} catch (URISyntaxException e) {
return Observable.error(e);
}
return client.submit(r);
String host = uri.getHost();
int port = uri.getPort();
HttpClientBuilder<I, O> clientBuilder =
new HttpClientBuilder<I, O>(host, port).pipelineConfigurator(configurator);
int requestConnectTimeout = getProperty(IClientConfigKey.CommonKeys.ConnectTimeout, requestConfig);
HttpClient<I, O> client = clientBuilder.channelOption(
ChannelOption.CONNECT_TIMEOUT_MILLIS, requestConnectTimeout).eventloop(getNextEventGroupLoop()).config(rxClientConfig).build();
return client.submit(request);
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册