From f612b788330d1571e2933bbb4d62c508f1c8f9c9 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 18 Apr 2014 19:11:41 -0700 Subject: [PATCH] Added pool stats and JUnit --- .../netty/http/CachedNettyHttpClient.java | 65 +++++ .../client/netty/http/GlobalPoolStats.java | 222 ++++++++++++++++++ .../client/netty/http/NettyHttpClient.java | 30 ++- .../netflix/client/netty/NettyClientTest.java | 29 ++- 4 files changed, 333 insertions(+), 13 deletions(-) create mode 100644 ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/CachedNettyHttpClient.java create mode 100644 ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/GlobalPoolStats.java diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/CachedNettyHttpClient.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/CachedNettyHttpClient.java new file mode 100644 index 0000000..c744995 --- /dev/null +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/CachedNettyHttpClient.java @@ -0,0 +1,65 @@ +package com.netflix.client.netty.http; + + +import io.reactivex.netty.protocol.http.client.HttpClient; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.netflix.client.config.DefaultClientConfigImpl; +import com.netflix.client.config.IClientConfig; +import com.netflix.loadbalancer.Server; + +@SuppressWarnings("rawtypes") +public abstract class CachedNettyHttpClient extends AbstractNettyHttpClient implements Closeable { + + private ConcurrentHashMap rxClientCache; + + public CachedNettyHttpClient() { + this(DefaultClientConfigImpl.getClientConfigWithDefaultValues()); + } + + public CachedNettyHttpClient(IClientConfig config) { + super(config); + rxClientCache = new ConcurrentHashMap(); + } + + @SuppressWarnings("unchecked") + @Override + protected HttpClient getRxClient(String host, int port) { + Server server = new Server(host, port); + HttpClient client = rxClientCache.get(server); + if (client != null) { + return client; + } else { + client = createRxClient(server); + HttpClient old = rxClientCache.putIfAbsent(server, client); + if (old != null) { + return old; + } else { + return client; + } + } + } + + protected abstract HttpClient createRxClient(Server server); + + protected ConcurrentMap getCurrentHttpClients() { + return rxClientCache; + } + + protected HttpClient removeClient(Server server) { + HttpClient client = rxClientCache.remove(server); + client.shutdown(); + return client; + } + + @Override + public void close() throws IOException { + for (Server server: rxClientCache.keySet()) { + removeClient(server); + } + } +} diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/GlobalPoolStats.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/GlobalPoolStats.java new file mode 100644 index 0000000..3d3677c --- /dev/null +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/GlobalPoolStats.java @@ -0,0 +1,222 @@ +package com.netflix.client.netty.http; + +import rx.Observer; +import io.reactivex.netty.client.PoolInsightProvider; +import io.reactivex.netty.protocol.http.client.HttpClient; + +import com.netflix.numerus.LongAdder; +import com.netflix.servo.annotations.DataSourceType; +import com.netflix.servo.annotations.Monitor; +import com.netflix.servo.monitor.Monitors; + +public class GlobalPoolStats implements Observer { + + private final LongAdder creationCount = new LongAdder(); + private final LongAdder failedCount = new LongAdder(); + private final LongAdder reuseCount = new LongAdder(); + private final LongAdder evictionCount = new LongAdder(); + private final LongAdder acquireAttemptedCount = new LongAdder(); + private final LongAdder acquireSucceededCount = new LongAdder(); + private final LongAdder acquireFailedCount = new LongAdder(); + private final LongAdder releaseAttemptedCount = new LongAdder(); + private final LongAdder releaseSucceededCount = new LongAdder(); + private final LongAdder releaseFailedCount = new LongAdder(); + + private final NettyHttpClient client; + + public GlobalPoolStats(String name, NettyHttpClient client) { + Monitors.registerObject(name, this); + this.client = client; + } + + public void onConnectionCreation() { + creationCount.increment(); + } + + public void onConnectFailed() { + failedCount.increment(); + } + + public void onConnectionReuse() { + reuseCount.increment(); + } + + public void onConnectionEviction() { + System.err.println("onConnectionEviction"); + evictionCount.increment(); + } + + public void onAcquireAttempted() { + System.err.println("onAcquireAttempted"); + acquireAttemptedCount.increment(); + } + + public void onAcquireSucceeded() { + System.err.println("onAcquireSucceeded"); + acquireSucceededCount.increment(); + } + + public void onAcquireFailed() { + acquireFailedCount.increment(); + } + + public void onReleaseAttempted() { + System.err.println("onReleaseAttempted"); + releaseAttemptedCount.increment(); + } + + public void onReleaseSucceeded() { + System.err.println("onReleaseSucceeded"); + releaseSucceededCount.increment(); + } + + public void onReleaseFailed() { + System.err.println("onReleaseAttempted"); + releaseFailedCount.increment(); + } + + @Monitor(name="AcquireAttempt", type=DataSourceType.COUNTER) + public long getAcquireAttemptedCount() { + return acquireAttemptedCount.longValue(); + } + + @Monitor(name="AcquireFailed", type=DataSourceType.COUNTER) + public long getAcquireFailedCount() { + return acquireFailedCount.longValue(); + } + + @Monitor(name="AcquireSucceeded", type=DataSourceType.COUNTER) + public long getAcquireSucceededCount() { + return acquireSucceededCount.longValue(); + } + + @Monitor(name="Creation", type=DataSourceType.COUNTER) + public long getCreationCount() { + return creationCount.longValue(); + } + + @Monitor(name="Deletion", type=DataSourceType.COUNTER) + public long getEvictionCount() { + return evictionCount.longValue(); + } + + @Monitor(name="ConnectionFailed", type=DataSourceType.COUNTER) + public long getFailedCount() { + return failedCount.longValue(); + } + + @Monitor(name="ReleaseAttempted", type=DataSourceType.COUNTER) + public long getReleaseAttemptedCount() { + return releaseAttemptedCount.longValue(); + } + + @Monitor(name="ReleaseFailed", type=DataSourceType.COUNTER) + public long getReleaseFailedCount() { + return releaseFailedCount.longValue(); + } + + @Monitor(name="ReleaseSucceeded", type=DataSourceType.COUNTER) + public long getReleaseSucceededCount() { + return releaseSucceededCount.longValue(); + } + + @Monitor(name="Reuse", type=DataSourceType.COUNTER) + public long getReuseCount() { + return reuseCount.longValue(); + } + + @Monitor(name="InUse", type=DataSourceType.GAUGE) + public long getInUseCount() { + long total = 0; + for (HttpClient rxclient: client.getCurrentHttpClients().values()) { + total += rxclient.getStats().getInUseCount(); + } + return total; + } + + @Monitor(name="Idle", type=DataSourceType.GAUGE) + public long getIdleCount() { + long total = 0; + for (HttpClient rxclient: client.getCurrentHttpClients().values()) { + total += rxclient.getStats().getIdleCount(); + } + return total; + } + + @Monitor(name="Total", type=DataSourceType.GAUGE) + public long getTotalConnectionCount() { + long total = 0; + for (HttpClient rxclient: client.getCurrentHttpClients().values()) { + total += rxclient.getStats().getTotalConnectionCount(); + } + return total; + } + + @Monitor(name="PendingAccquire", type=DataSourceType.GAUGE) + public long getPendingAcquireRequestCount() { + long total = 0; + for (HttpClient rxclient: client.getCurrentHttpClients().values()) { + total += rxclient.getStats().getPendingAcquireRequestCount(); + } + return total; + } + + @Monitor(name="PendingRelease", type=DataSourceType.GAUGE) + public long getPendingReleaseRequestCount() { + long total = 0; + for (HttpClient rxclient: client.getCurrentHttpClients().values()) { + total += rxclient.getStats().getPendingReleaseRequestCount(); + } + return total; + } + + @Monitor(name="MaxTotalConnections", type=DataSourceType.GAUGE) + public int getMaxTotalConnections() { + return client.getMaxTotalConnections(); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + + @Override + public void onNext(PoolInsightProvider.PoolStateChangeEvent stateChangeEvent) { + switch (stateChangeEvent) { + case NewConnectionCreated: + onConnectionCreation(); + break; + case ConnectFailed: + onConnectFailed(); + break; + case OnConnectionReuse: + onConnectionReuse(); + break; + case OnConnectionEviction: + onConnectionEviction(); + break; + case onAcquireAttempted: + onAcquireAttempted(); + break; + case onAcquireSucceeded: + onAcquireSucceeded(); + break; + case onAcquireFailed: + onAcquireFailed(); + break; + case onReleaseAttempted: + onReleaseAttempted(); + break; + case onReleaseSucceeded: + onReleaseSucceeded(); + break; + case onReleaseFailed: + onReleaseFailed(); + break; + } + } +} diff --git a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpClient.java b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpClient.java index afcd06e..8dc5e0e 100644 --- a/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpClient.java +++ b/ribbon-rxnetty/src/main/java/com/netflix/client/netty/http/NettyHttpClient.java @@ -84,9 +84,11 @@ import com.netflix.utils.ScheduledThreadPoolExectuorWithDynamicSize; */ public class NettyHttpClient extends CachedNettyHttpClient { - private CompositePoolLimitDeterminationStrategy poolStrategy; - private MaxConnectionsBasedStrategy globalStrategy; - private int idleConnectionEvictionMills; + private final CompositePoolLimitDeterminationStrategy poolStrategy; + private final MaxConnectionsBasedStrategy globalStrategy; + private final int idleConnectionEvictionMills; + private final GlobalPoolStats stats; + private static final ScheduledExecutorService poolCleanerScheduler; private static final DynamicIntProperty POOL_CLEANER_CORE_SIZE = new DynamicIntProperty("rxNetty.poolCleaner.coreSize", 2); @@ -110,6 +112,7 @@ public class NettyHttpClient extends CachedNettyHttpClient { globalStrategy = new MaxConnectionsBasedStrategy(maxTotalConnections); poolStrategy = new CompositePoolLimitDeterminationStrategy(perHostStrategy, globalStrategy); idleConnectionEvictionMills = config.getPropertyWithType(CommonKeys.ConnIdleEvictTimeMilliSeconds, DefaultClientConfigImpl.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS); + stats = new GlobalPoolStats(config.getClientName(), this); } @Override @@ -128,6 +131,7 @@ public class NettyHttpClient extends CachedNettyHttpClient { .withIdleConnectionsTimeoutMillis(idleConnectionEvictionMills) .withPoolIdleCleanupScheduler(poolCleanerScheduler) .build(); + client.poolStateChangeObservable().subscribe(stats); return client; } @@ -143,22 +147,24 @@ public class NettyHttpClient extends CachedNettyHttpClient { return super.submit(host, port, request, requestConfig); } + protected void setMaxTotalConnections(int newMax) { + globalStrategy.incrementMaxConnections(newMax - getMaxTotalConnections()); + } + + public int getMaxTotalConnections() { + return globalStrategy.getMaxConnections(); + } - @SuppressWarnings("rawtypes") public int getIdleConnectionsInPool() { int total = 0; - for (Map.Entry entry: getCurrentHttpClients().entrySet()) { - PoolStats poolStats = entry.getValue().getStats(); + for (HttpClient client: getCurrentHttpClients().values()) { + PoolStats poolStats = client.getStats(); total += poolStats.getIdleCount(); } return total; } - - protected void setMaxTotalConnections(int newMax) { - globalStrategy.incrementMaxConnections(newMax - getMaxTotalConnections()); - } - public int getMaxTotalConnections() { - return globalStrategy.getMaxConnections(); + public GlobalPoolStats getGlobalPoolStats() { + return stats; } } diff --git a/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java b/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java index 1e0a99d..cd7b51a 100644 --- a/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java +++ b/ribbon-rxnetty/src/test/java/com/netflix/client/netty/NettyClientTest.java @@ -128,9 +128,36 @@ public class NettyClientTest { Observable> response = observableClient.submit(host, port, request); Person person = getPersonObservable(response).toBlockingObservable().single(); assertEquals(EmbeddedResources.defaultPerson, person); - assertEquals(1, observableClient.getIdleConnectionsInPool()); + // need to sleep to wait until connection is released + Thread.sleep(1000); + assertEquals(1, observableClient.getGlobalPoolStats().getIdleCount()); + assertEquals(1, observableClient.getGlobalPoolStats().getAcquireSucceededCount()); + assertEquals(1, observableClient.getGlobalPoolStats().getReleaseSucceededCount()); + assertEquals(1, observableClient.getGlobalPoolStats().getTotalConnectionCount()); } + + @Test + public void testPoolReuse() throws Exception { + HttpClientRequest request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person"); + NettyHttpClient observableClient = NettyHttpClientBuilder.newBuilder().buildHttpClient(); + // final List result = Lists.newArrayList(); + Observable> response = observableClient.submit(host, port, request); + Person person = getPersonObservable(response).toBlockingObservable().single(); + assertEquals(EmbeddedResources.defaultPerson, person); + Thread.sleep(1000); + assertEquals(1, observableClient.getGlobalPoolStats().getIdleCount()); + response = observableClient.submit(host, port, request); + person = getPersonObservable(response).toBlockingObservable().single(); + assertEquals(EmbeddedResources.defaultPerson, person); + Thread.sleep(1000); + assertEquals(2, observableClient.getGlobalPoolStats().getAcquireSucceededCount()); + assertEquals(2, observableClient.getGlobalPoolStats().getReleaseSucceededCount()); + assertEquals(1, observableClient.getGlobalPoolStats().getTotalConnectionCount()); + assertEquals(1, observableClient.getGlobalPoolStats().getReuseCount()); + } + + @Test public void testPostWithObservable() throws Exception { Person myPerson = new Person("netty", 5); -- GitLab