提交 f612b788 编写于 作者: A Allen Wang

Added pool stats and JUnit

上级 fc3aab12
package com.netflix.client.netty.http;
import io.reactivex.netty.protocol.http.client.HttpClient;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.Server;
@SuppressWarnings("rawtypes")
public abstract class CachedNettyHttpClient<O> extends AbstractNettyHttpClient<O> implements Closeable {
private ConcurrentHashMap<Server, HttpClient> rxClientCache;
public CachedNettyHttpClient() {
this(DefaultClientConfigImpl.getClientConfigWithDefaultValues());
}
public CachedNettyHttpClient(IClientConfig config) {
super(config);
rxClientCache = new ConcurrentHashMap<Server, HttpClient>();
}
@SuppressWarnings("unchecked")
@Override
protected <I> HttpClient<I, O> getRxClient(String host, int port) {
Server server = new Server(host, port);
HttpClient client = rxClientCache.get(server);
if (client != null) {
return client;
} else {
client = createRxClient(server);
HttpClient old = rxClientCache.putIfAbsent(server, client);
if (old != null) {
return old;
} else {
return client;
}
}
}
protected abstract <I> HttpClient<I,O> createRxClient(Server server);
protected ConcurrentMap<Server, HttpClient> getCurrentHttpClients() {
return rxClientCache;
}
protected HttpClient removeClient(Server server) {
HttpClient client = rxClientCache.remove(server);
client.shutdown();
return client;
}
@Override
public void close() throws IOException {
for (Server server: rxClientCache.keySet()) {
removeClient(server);
}
}
}
package com.netflix.client.netty.http;
import rx.Observer;
import io.reactivex.netty.client.PoolInsightProvider;
import io.reactivex.netty.protocol.http.client.HttpClient;
import com.netflix.numerus.LongAdder;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Monitors;
public class GlobalPoolStats implements Observer<PoolInsightProvider.PoolStateChangeEvent> {
private final LongAdder creationCount = new LongAdder();
private final LongAdder failedCount = new LongAdder();
private final LongAdder reuseCount = new LongAdder();
private final LongAdder evictionCount = new LongAdder();
private final LongAdder acquireAttemptedCount = new LongAdder();
private final LongAdder acquireSucceededCount = new LongAdder();
private final LongAdder acquireFailedCount = new LongAdder();
private final LongAdder releaseAttemptedCount = new LongAdder();
private final LongAdder releaseSucceededCount = new LongAdder();
private final LongAdder releaseFailedCount = new LongAdder();
private final NettyHttpClient client;
public GlobalPoolStats(String name, NettyHttpClient client) {
Monitors.registerObject(name, this);
this.client = client;
}
public void onConnectionCreation() {
creationCount.increment();
}
public void onConnectFailed() {
failedCount.increment();
}
public void onConnectionReuse() {
reuseCount.increment();
}
public void onConnectionEviction() {
System.err.println("onConnectionEviction");
evictionCount.increment();
}
public void onAcquireAttempted() {
System.err.println("onAcquireAttempted");
acquireAttemptedCount.increment();
}
public void onAcquireSucceeded() {
System.err.println("onAcquireSucceeded");
acquireSucceededCount.increment();
}
public void onAcquireFailed() {
acquireFailedCount.increment();
}
public void onReleaseAttempted() {
System.err.println("onReleaseAttempted");
releaseAttemptedCount.increment();
}
public void onReleaseSucceeded() {
System.err.println("onReleaseSucceeded");
releaseSucceededCount.increment();
}
public void onReleaseFailed() {
System.err.println("onReleaseAttempted");
releaseFailedCount.increment();
}
@Monitor(name="AcquireAttempt", type=DataSourceType.COUNTER)
public long getAcquireAttemptedCount() {
return acquireAttemptedCount.longValue();
}
@Monitor(name="AcquireFailed", type=DataSourceType.COUNTER)
public long getAcquireFailedCount() {
return acquireFailedCount.longValue();
}
@Monitor(name="AcquireSucceeded", type=DataSourceType.COUNTER)
public long getAcquireSucceededCount() {
return acquireSucceededCount.longValue();
}
@Monitor(name="Creation", type=DataSourceType.COUNTER)
public long getCreationCount() {
return creationCount.longValue();
}
@Monitor(name="Deletion", type=DataSourceType.COUNTER)
public long getEvictionCount() {
return evictionCount.longValue();
}
@Monitor(name="ConnectionFailed", type=DataSourceType.COUNTER)
public long getFailedCount() {
return failedCount.longValue();
}
@Monitor(name="ReleaseAttempted", type=DataSourceType.COUNTER)
public long getReleaseAttemptedCount() {
return releaseAttemptedCount.longValue();
}
@Monitor(name="ReleaseFailed", type=DataSourceType.COUNTER)
public long getReleaseFailedCount() {
return releaseFailedCount.longValue();
}
@Monitor(name="ReleaseSucceeded", type=DataSourceType.COUNTER)
public long getReleaseSucceededCount() {
return releaseSucceededCount.longValue();
}
@Monitor(name="Reuse", type=DataSourceType.COUNTER)
public long getReuseCount() {
return reuseCount.longValue();
}
@Monitor(name="InUse", type=DataSourceType.GAUGE)
public long getInUseCount() {
long total = 0;
for (HttpClient<?, ?> rxclient: client.getCurrentHttpClients().values()) {
total += rxclient.getStats().getInUseCount();
}
return total;
}
@Monitor(name="Idle", type=DataSourceType.GAUGE)
public long getIdleCount() {
long total = 0;
for (HttpClient<?, ?> rxclient: client.getCurrentHttpClients().values()) {
total += rxclient.getStats().getIdleCount();
}
return total;
}
@Monitor(name="Total", type=DataSourceType.GAUGE)
public long getTotalConnectionCount() {
long total = 0;
for (HttpClient<?, ?> rxclient: client.getCurrentHttpClients().values()) {
total += rxclient.getStats().getTotalConnectionCount();
}
return total;
}
@Monitor(name="PendingAccquire", type=DataSourceType.GAUGE)
public long getPendingAcquireRequestCount() {
long total = 0;
for (HttpClient<?, ?> rxclient: client.getCurrentHttpClients().values()) {
total += rxclient.getStats().getPendingAcquireRequestCount();
}
return total;
}
@Monitor(name="PendingRelease", type=DataSourceType.GAUGE)
public long getPendingReleaseRequestCount() {
long total = 0;
for (HttpClient<?, ?> rxclient: client.getCurrentHttpClients().values()) {
total += rxclient.getStats().getPendingReleaseRequestCount();
}
return total;
}
@Monitor(name="MaxTotalConnections", type=DataSourceType.GAUGE)
public int getMaxTotalConnections() {
return client.getMaxTotalConnections();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(PoolInsightProvider.PoolStateChangeEvent stateChangeEvent) {
switch (stateChangeEvent) {
case NewConnectionCreated:
onConnectionCreation();
break;
case ConnectFailed:
onConnectFailed();
break;
case OnConnectionReuse:
onConnectionReuse();
break;
case OnConnectionEviction:
onConnectionEviction();
break;
case onAcquireAttempted:
onAcquireAttempted();
break;
case onAcquireSucceeded:
onAcquireSucceeded();
break;
case onAcquireFailed:
onAcquireFailed();
break;
case onReleaseAttempted:
onReleaseAttempted();
break;
case onReleaseSucceeded:
onReleaseSucceeded();
break;
case onReleaseFailed:
onReleaseFailed();
break;
}
}
}
......@@ -84,9 +84,11 @@ import com.netflix.utils.ScheduledThreadPoolExectuorWithDynamicSize;
*/
public class NettyHttpClient extends CachedNettyHttpClient<ByteBuf> {
private CompositePoolLimitDeterminationStrategy poolStrategy;
private MaxConnectionsBasedStrategy globalStrategy;
private int idleConnectionEvictionMills;
private final CompositePoolLimitDeterminationStrategy poolStrategy;
private final MaxConnectionsBasedStrategy globalStrategy;
private final int idleConnectionEvictionMills;
private final GlobalPoolStats stats;
private static final ScheduledExecutorService poolCleanerScheduler;
private static final DynamicIntProperty POOL_CLEANER_CORE_SIZE = new DynamicIntProperty("rxNetty.poolCleaner.coreSize", 2);
......@@ -110,6 +112,7 @@ public class NettyHttpClient extends CachedNettyHttpClient<ByteBuf> {
globalStrategy = new MaxConnectionsBasedStrategy(maxTotalConnections);
poolStrategy = new CompositePoolLimitDeterminationStrategy(perHostStrategy, globalStrategy);
idleConnectionEvictionMills = config.getPropertyWithType(CommonKeys.ConnIdleEvictTimeMilliSeconds, DefaultClientConfigImpl.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS);
stats = new GlobalPoolStats(config.getClientName(), this);
}
@Override
......@@ -128,6 +131,7 @@ public class NettyHttpClient extends CachedNettyHttpClient<ByteBuf> {
.withIdleConnectionsTimeoutMillis(idleConnectionEvictionMills)
.withPoolIdleCleanupScheduler(poolCleanerScheduler)
.build();
client.poolStateChangeObservable().subscribe(stats);
return client;
}
......@@ -143,22 +147,24 @@ public class NettyHttpClient extends CachedNettyHttpClient<ByteBuf> {
return super.submit(host, port, request, requestConfig);
}
protected void setMaxTotalConnections(int newMax) {
globalStrategy.incrementMaxConnections(newMax - getMaxTotalConnections());
}
public int getMaxTotalConnections() {
return globalStrategy.getMaxConnections();
}
@SuppressWarnings("rawtypes")
public int getIdleConnectionsInPool() {
int total = 0;
for (Map.Entry<Server, HttpClient> entry: getCurrentHttpClients().entrySet()) {
PoolStats poolStats = entry.getValue().getStats();
for (HttpClient client: getCurrentHttpClients().values()) {
PoolStats poolStats = client.getStats();
total += poolStats.getIdleCount();
}
return total;
}
protected void setMaxTotalConnections(int newMax) {
globalStrategy.incrementMaxConnections(newMax - getMaxTotalConnections());
}
public int getMaxTotalConnections() {
return globalStrategy.getMaxConnections();
public GlobalPoolStats getGlobalPoolStats() {
return stats;
}
}
......@@ -128,9 +128,36 @@ public class NettyClientTest {
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(host, port, request);
Person person = getPersonObservable(response).toBlockingObservable().single();
assertEquals(EmbeddedResources.defaultPerson, person);
assertEquals(1, observableClient.getIdleConnectionsInPool());
// need to sleep to wait until connection is released
Thread.sleep(1000);
assertEquals(1, observableClient.getGlobalPoolStats().getIdleCount());
assertEquals(1, observableClient.getGlobalPoolStats().getAcquireSucceededCount());
assertEquals(1, observableClient.getGlobalPoolStats().getReleaseSucceededCount());
assertEquals(1, observableClient.getGlobalPoolStats().getTotalConnectionCount());
}
@Test
public void testPoolReuse() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
NettyHttpClient observableClient = NettyHttpClientBuilder.newBuilder().buildHttpClient();
// final List<Person> result = Lists.newArrayList();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(host, port, request);
Person person = getPersonObservable(response).toBlockingObservable().single();
assertEquals(EmbeddedResources.defaultPerson, person);
Thread.sleep(1000);
assertEquals(1, observableClient.getGlobalPoolStats().getIdleCount());
response = observableClient.submit(host, port, request);
person = getPersonObservable(response).toBlockingObservable().single();
assertEquals(EmbeddedResources.defaultPerson, person);
Thread.sleep(1000);
assertEquals(2, observableClient.getGlobalPoolStats().getAcquireSucceededCount());
assertEquals(2, observableClient.getGlobalPoolStats().getReleaseSucceededCount());
assertEquals(1, observableClient.getGlobalPoolStats().getTotalConnectionCount());
assertEquals(1, observableClient.getGlobalPoolStats().getReuseCount());
}
@Test
public void testPostWithObservable() throws Exception {
Person myPerson = new Person("netty", 5);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册