提交 a2a411fb 编写于 作者: T Tomasz Bak

Add ProxyLifeCycle interface. Update proxy example.

上级 276b8666
......@@ -48,7 +48,7 @@ public class HttpResourceGroup extends ResourceGroup<HttpRequestTemplate<?>> {
return headers;
}
HttpClient<ByteBuf, ByteBuf> getClient() {
public HttpClient<ByteBuf, ByteBuf> getClient() {
return client;
}
}
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.ribbonclientextensions.proxy;
public interface ProxyLifeCycle {
boolean isShutDown();
void shutdown();
}
......@@ -26,9 +26,11 @@ import java.util.Map;
* @author Tomasz Bak
*/
public class RibbonDynamicProxy<T> implements InvocationHandler {
private final ProxyLifeCycle lifeCycle;
private final Map<Method, MethodTemplateExecutor> templateExecutorMap;
public RibbonDynamicProxy(Class<T> clientInterface, HttpResourceGroup httpResourceGroup) {
this.lifeCycle = new ProxyLifecycleImpl(httpResourceGroup);
ClassTemplate<T> classTemplate = ClassTemplate.from(clientInterface);
if (httpResourceGroup == null) {
httpResourceGroup = new HttpResourceGroupFactory<T>(classTemplate).createResourceGroup();
......@@ -42,15 +44,50 @@ public class RibbonDynamicProxy<T> implements InvocationHandler {
if (template != null) {
return template.executeFromTemplate(args);
}
if (ProxyLifeCycle.class.isAssignableFrom(method.getDeclaringClass())) {
return handleProxyLifeCycle(method, args);
}
// This must be one of the Object methods. Lets run it on the handler itself.
return Utils.executeOnInstance(this, method, args);
}
private Object handleProxyLifeCycle(Method method, Object[] args) {
try {
return method.invoke(lifeCycle, args);
} catch (Exception e) {
throw new RibbonProxyException("ProxyLifeCycle call failure on method " + method.getName(), e);
}
}
@Override
public String toString() {
return "RibbonDynamicProxy{...}";
}
private static class ProxyLifecycleImpl implements ProxyLifeCycle {
private final HttpResourceGroup httpResourceGroup;
private volatile boolean shutdownFlag;
public ProxyLifecycleImpl(HttpResourceGroup httpResourceGroup) {
this.httpResourceGroup = httpResourceGroup;
}
@Override
public boolean isShutDown() {
return shutdownFlag;
}
@Override
public synchronized void shutdown() {
if (!shutdownFlag) {
httpResourceGroup.getClient().shutdown();
shutdownFlag = true;
}
}
}
@SuppressWarnings("unchecked")
public static <T> T newInstance(Class<T> clientInterface, HttpResourceGroup httpResourceGroup) {
if (!clientInterface.isInterface()) {
......@@ -58,7 +95,7 @@ public class RibbonDynamicProxy<T> implements InvocationHandler {
}
return (T) Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
new Class[]{clientInterface},
new Class[]{clientInterface, ProxyLifeCycle.class},
new RibbonDynamicProxy<T>(clientInterface, httpResourceGroup)
);
}
......
......@@ -5,9 +5,12 @@ import com.netflix.ribbonclientextensions.http.HttpResourceGroup;
import com.netflix.ribbonclientextensions.proxy.sample.Movie;
import com.netflix.ribbonclientextensions.proxy.sample.MovieServiceInterfaces.SampleMovieService;
import com.netflix.ribbonclientextensions.proxy.sample.MovieServiceInterfaces.SampleMovieServiceWithResourceGroupNameAnnotation;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -18,6 +21,7 @@ import java.util.Map;
import static com.netflix.ribbonclientextensions.proxy.Utils.*;
import static org.easymock.EasyMock.*;
import static org.powermock.api.easymock.PowerMock.createMock;
import static org.powermock.api.easymock.PowerMock.expectLastCall;
import static org.powermock.api.easymock.PowerMock.*;
import static org.testng.Assert.*;
......@@ -28,11 +32,17 @@ import static org.testng.Assert.*;
@PrepareForTest({RibbonDynamicProxy.class, MethodTemplateExecutor.class})
public class RibbonDynamicProxyTest {
private HttpResourceGroup httpResourceGroupMock = createMock(HttpResourceGroup.class);
@Mock
private HttpResourceGroup httpResourceGroupMock;
private HttpResourceGroupFactory httpResourceGroupFactoryMock = createMock(HttpResourceGroupFactory.class);
@Mock
private HttpResourceGroupFactory httpResourceGroupFactoryMock;
private RibbonRequest ribbonRequestMock = createMock(RibbonRequest.class);
@Mock
private RibbonRequest ribbonRequestMock;
@Mock
private HttpClient<ByteBuf, ByteBuf> httpClientMock;
@Before
public void setUp() throws Exception {
......@@ -72,6 +82,21 @@ public class RibbonDynamicProxyTest {
assertNotNull(ribbonMovie);
}
@Test
public void testLifeCycleShutdown() throws Exception {
initializeSampleMovieServiceMocks();
expect(httpResourceGroupMock.getClient()).andReturn(httpClientMock);
httpClientMock.shutdown();
expectLastCall();
replayAll();
SampleMovieService service = RibbonDynamicProxy.newInstance(SampleMovieService.class, httpResourceGroupMock);
ProxyLifeCycle proxyLifeCycle = (ProxyLifeCycle) service;
proxyLifeCycle.shutdown();
assertTrue(proxyLifeCycle.isShutDown());
}
@Test
public void testPlainObjectInvocations() throws Exception {
initializeSampleMovieServiceMocks();
......
......@@ -21,12 +21,10 @@ import com.netflix.ribbonclientextensions.proxy.annotation.CacheProviders;
import com.netflix.ribbonclientextensions.proxy.annotation.CacheProviders.Provider;
import com.netflix.ribbonclientextensions.proxy.annotation.Content;
import com.netflix.ribbonclientextensions.proxy.annotation.ContentTransformerClass;
import com.netflix.ribbonclientextensions.proxy.annotation.EvCache;
import com.netflix.ribbonclientextensions.proxy.annotation.Http;
import com.netflix.ribbonclientextensions.proxy.annotation.Http.Header;
import com.netflix.ribbonclientextensions.proxy.annotation.Http.HttpMethod;
import com.netflix.ribbonclientextensions.proxy.annotation.Hystrix;
import com.netflix.ribbonclientextensions.proxy.annotation.ResourceGroup;
import com.netflix.ribbonclientextensions.proxy.annotation.TemplateName;
import com.netflix.ribbonclientextensions.proxy.annotation.Var;
import io.netty.buffer.ByteBuf;
......@@ -34,7 +32,6 @@ import io.netty.buffer.ByteBuf;
/**
* @author Tomasz Bak
*/
@ResourceGroup(name = "movieServiceGroup")
public interface MovieService {
@TemplateName("recommendationsByUserId")
......@@ -43,15 +40,13 @@ public interface MovieService {
path = "/users/{userId}/recommendations",
headers = {
@Header(name = "X-Platform-Version", value = "xyz"),
@Header(name = "X-Auth-Token", value = "abc"),
@Header(name = "X-Auth-Token", value = "abc")
})
@Hystrix(
cacheKey = "userRecommendations/{userId}",
validator = RecommendationServiceResponseValidator.class,
fallbackHandler = RecommendationServiceFallbackHandler.class)
@CacheProviders(@Provider(key = "{userId}", provider = InMemoryCacheProviderFactory.class))
@EvCache(name = "recommendationsByUserId", appName = "recommendations", cacheKeyTemplate = "{userId}", ttl = 50,
enableZoneFallback = true, transcoder = MovieEVCacheTranscoder.class)
RibbonRequest<ByteBuf> recommendationsByUserId(@Var("userId") String userId);
@TemplateName("recommendationsBy")
......@@ -60,15 +55,13 @@ public interface MovieService {
path = "/recommendations?category={category}&ageGroup={ageGroup}",
headers = {
@Header(name = "X-Platform-Version", value = "xyz"),
@Header(name = "X-Auth-Token", value = "abc"),
@Header(name = "X-Auth-Token", value = "abc")
})
@Hystrix(
cacheKey = "{category},{ageGroup}",
validator = RecommendationServiceResponseValidator.class,
fallbackHandler = RecommendationServiceFallbackHandler.class)
@CacheProviders(@Provider(key = "{category},{ageGroup}", provider = InMemoryCacheProviderFactory.class))
@EvCache(name = "recommendations", appName = "recommendations", cacheKeyTemplate = "{category},{ageGroup}", ttl = 50,
enableZoneFallback = true, transcoder = MovieEVCacheTranscoder.class)
RibbonRequest<ByteBuf> recommendationsBy(@Var("category") String category, @Var("ageGroup") String ageGroup);
@TemplateName("registerMovie")
......@@ -77,7 +70,7 @@ public interface MovieService {
path = "/movies",
headers = {
@Header(name = "X-Platform-Version", value = "xyz"),
@Header(name = "X-Auth-Token", value = "abc"),
@Header(name = "X-Auth-Token", value = "abc")
})
@Hystrix(validator = RecommendationServiceResponseValidator.class)
@ContentTransformerClass(RxMovieTransformer.class)
......@@ -89,8 +82,8 @@ public interface MovieService {
path = "/users/{userId}/recommendations",
headers = {
@Header(name = "X-Platform-Version", value = "xyz"),
@Header(name = "X-Auth-Token", value = "abc"),
@Header(name = "X-Auth-Token", value = "abc")
})
@Hystrix(validator = RecommendationServiceResponseValidator.class)
RibbonRequest<Void> updateRecommendations(@Var("userId") String userId, @Var("movieId") String movieId);
RibbonRequest<Void> updateRecommendations(@Var("userId") String userId, @Content String movieId);
}
......@@ -16,14 +16,23 @@
package com.netflix.ribbon.examples.proxy;
import com.netflix.hystrix.util.HystrixTimer;
import com.netflix.ribbonclientextensions.ClientOptions;
import com.netflix.ribbonclientextensions.Ribbon;
import com.netflix.ribbonclientextensions.http.HttpResourceGroup;
import com.netflix.ribbonclientextensions.proxy.ProxyLifeCycle;
import io.netty.buffer.ByteBuf;
import rx.Notification;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import static java.lang.String.*;
......@@ -32,17 +41,48 @@ import static java.lang.String.*;
*/
public class RxMovieProxyExample {
private static final String TEST_USER = "user1";
private static final Pattern NEW_LINE_SPLIT_RE = Pattern.compile("\n");
private final MovieService movieService;
public RxMovieProxyExample() {
movieService = Ribbon.from(MovieService.class);
HttpResourceGroup httpResourceGroup = Ribbon.createHttpResourceGroup("movieServiceClient",
ClientOptions.create()
.withMaxAutoRetriesNextServer(3)
.withConfigurationBasedServerList("localhost:" + RxMovieServer.DEFAULT_PORT));
movieService = Ribbon.from(MovieService.class, httpResourceGroup);
}
private void registerMovies() throws URISyntaxException {
System.out.println("Registering movies...");
movieService.registerMovie(Movie.ORANGE_IS_THE_NEW_BLACK);
movieService.registerMovie(Movie.BRAKING_BAD);
movieService.registerMovie(Movie.HOUSE_OF_CARDS);
private boolean registerMovies() throws URISyntaxException {
System.out.print("Registering movies...");
Notification<Void> status = Observable.concat(
movieService.registerMovie(Movie.ORANGE_IS_THE_NEW_BLACK).observe(),
movieService.registerMovie(Movie.BRAKING_BAD).observe(),
movieService.registerMovie(Movie.HOUSE_OF_CARDS).observe()).materialize().toBlocking().last();
if (status.isOnError()) {
System.err.println("ERROR: movie registration failure");
status.getThrowable().printStackTrace();
return false;
}
System.out.println("Movies registered.");
return true;
}
private boolean updateRecommendations() {
System.out.print("Updating recommendations for user " + TEST_USER + "...");
Notification<Void> status = Observable.concat(
movieService.updateRecommendations(TEST_USER, Movie.ORANGE_IS_THE_NEW_BLACK.getId()).observe(),
movieService.updateRecommendations(TEST_USER, Movie.BRAKING_BAD.getId()).observe()).materialize().toBlocking().last();
if (status.isOnError()) {
System.err.println("ERROR: user recommendations update failure");
status.getThrowable().printStackTrace();
return false;
}
System.out.println("User recommendations updated.");
return true;
}
private void searchCatalog() {
......@@ -50,23 +90,41 @@ public class RxMovieProxyExample {
List<String> searches = new ArrayList<String>(2);
Collections.addAll(searches, "findById", "findRawMovieById", "findMovie(name, category)");
Observable.concat(
movieService.recommendationsByUserId("1").observe(),
movieService.recommendationsBy("Orange is the New Black", "Drama").observe()
).cast(Movie.class).zip(searches, new Func2<Movie, String, Void>() {
movieService.recommendationsByUserId(TEST_USER).observe(),
movieService.recommendationsBy("Drama", "Adults").observe()
).flatMap(new Func1<ByteBuf, Observable<List<Movie>>>() {
@Override
public Observable<List<Movie>> call(ByteBuf byteBuf) {
List<Movie> movies = new ArrayList<Movie>();
String lines = byteBuf.toString(Charset.defaultCharset());
for (String line : NEW_LINE_SPLIT_RE.split(lines)) {
movies.add(Movie.from(line));
}
return Observable.just(movies);
}
}).zip(searches, new Func2<List<Movie>, String, Void>() {
@Override
public Void call(Movie movie, String query) {
System.out.println(format("%s=%s", query, movie));
public Void call(List<Movie> movies, String query) {
System.out.println(format("%s=%s", query, movies));
return null;
}
}).toBlocking().last();
}
public void shutdown() {
System.out.println("Shutting down the proxy...");
((ProxyLifeCycle) movieService).shutdown();
HystrixTimer.reset(); // FIXME This should be done selectively in RibbonDynamicProxy
}
public static void main(String[] args) {
System.out.println("Starting movie service proxy...");
try {
RxMovieProxyExample example = new RxMovieProxyExample();
example.registerMovies();
example.updateRecommendations();
example.searchCatalog();
example.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.logging.LogLevel;
import io.netty.util.internal.ConcurrentSet;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
......@@ -77,21 +78,22 @@ public class RxMovieServer {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.DEBUG).build();
System.out.println("RxMovie server started...");
return server;
}
private Observable<Void> handleRecommendationsByUserId(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
System.out.println("Recommendations by user id request: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return Observable.error(new IllegalArgumentException("Invalid URL"));
return response.close();
}
if (!userRecommendations.containsKey(userId)) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return Observable.error(new IllegalArgumentException("No recommendations for the user " + userId));
return response.close();
}
StringBuilder builder = new StringBuilder();
......@@ -107,11 +109,12 @@ public class RxMovieServer {
}
private Observable<Void> handleRecommendationsBy(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
System.out.println("Recommendations by multiple criteria request: " + request.getPath());
List<String> category = request.getQueryParameters().get("category");
List<String> ageGroup = request.getQueryParameters().get("ageGroup");
if (category.isEmpty() || ageGroup.isEmpty()) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return Observable.error(new IllegalArgumentException("Invalid URL"));
return response.close();
}
StringBuilder builder = new StringBuilder();
......@@ -129,16 +132,17 @@ public class RxMovieServer {
}
private Observable<Void> handleUpdateRecommendationsForUser(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("HTTP request -> update recommendations for user: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return Observable.error(new IllegalArgumentException("Invalid URL"));
return response.close();
}
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String movieId = byteBuf.toString(Charset.defaultCharset());
System.out.println(format("Updating recommendation for user %s and movie %s ", userId, movieId));
System.out.println(format(" updating recommendation for <user=%s, movie=%s>", userId, movieId));
synchronized (this) {
Set<String> recommendations;
if (userRecommendations.containsKey(userId)) {
......@@ -156,11 +160,12 @@ public class RxMovieServer {
}
private Observable<Void> handleRegisterMovie(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("Http request -> register movie: " + request.getPath());
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String formatted = byteBuf.toString(Charset.defaultCharset());
System.out.println("Registering movie " + formatted);
System.out.println(" registering movie " + formatted);
try {
Movie movie = Movie.from(formatted);
movies.put(movie.getId(), movie);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册