提交 2d02d8fb 编写于 作者: N Nikita Koksharov

Fixed - frequent Redis master failover causes memory leak in IdleConnectionWatcher. #3581

上级 5206a5b6
......@@ -67,12 +67,12 @@ public class ClientConnectionsEntry {
this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) {
connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter, c -> {
connectionManager.getConnectionWatcher().add(this, subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter, c -> {
freeSubscribeConnections.remove(c);
return allSubscribeConnections.remove(c);
});
}
connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter, c -> {
connectionManager.getConnectionWatcher().add(this, poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter, c -> {
freeConnections.remove(c);
return allConnections.remove(c);
});
......@@ -115,6 +115,11 @@ public class ClientConnectionsEntry {
firstFailTime.compareAndSet(0, System.currentTimeMillis());
}
public RFuture<Void> shutdownAsync() {
connectionManager.getConnectionWatcher().remove(this);
return client.shutdownAsync();
}
public RedisClient getClient() {
return client;
}
......
......@@ -26,10 +26,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public class IdleConnectionWatcher {
......@@ -55,37 +58,32 @@ public class IdleConnectionWatcher {
};
private final Queue<Entry> entries = new ConcurrentLinkedQueue<>();
private final Map<ClientConnectionsEntry, Queue<Entry>> entries = new ConcurrentHashMap<>();
private final ScheduledFuture<?> monitorFuture;
public IdleConnectionWatcher(ConnectionManager manager, MasterSlaveServersConfig config) {
monitorFuture = manager.getGroup().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
long currTime = System.nanoTime();
for (Entry entry : entries) {
if (!validateAmount(entry)) {
continue;
}
monitorFuture = manager.getGroup().scheduleWithFixedDelay(() -> {
long currTime = System.nanoTime();
for (Entry entry : entries.values().stream().flatMap(m -> m.stream()).collect(Collectors.toList())) {
if (!validateAmount(entry)) {
continue;
}
for (RedisConnection c : entry.connections) {
long timeInPool = TimeUnit.NANOSECONDS.toMillis(currTime - c.getLastUsageTime());
if (timeInPool > config.getIdleConnectionTimeout()
&& validateAmount(entry)
&& entry.deleteHandler.apply(c)) {
ChannelFuture future = c.closeAsync();
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
log.debug("Connection {} has been closed due to idle timeout. Not used for {} ms", c.getChannel(), timeInPool);
}
});
}
for (RedisConnection c : entry.connections) {
long timeInPool = TimeUnit.NANOSECONDS.toMillis(currTime - c.getLastUsageTime());
if (timeInPool > config.getIdleConnectionTimeout()
&& validateAmount(entry)
&& entry.deleteHandler.apply(c)) {
ChannelFuture future = c.closeAsync();
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
log.debug("Connection {} has been closed due to idle timeout. Not used for {} ms", c.getChannel(), timeInPool);
}
});
}
}
}
}, config.getIdleConnectionTimeout(), config.getIdleConnectionTimeout(), TimeUnit.MILLISECONDS);
}
......@@ -93,9 +91,14 @@ public class IdleConnectionWatcher {
return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount;
}
public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections,
public void remove(ClientConnectionsEntry entry) {
entries.remove(entry);
}
public void add(ClientConnectionsEntry entry, int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections,
AsyncSemaphore freeConnectionsCounter, Function<RedisConnection, Boolean> deleteHandler) {
entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter, deleteHandler));
Queue<Entry> list = entries.computeIfAbsent(entry, k -> new ConcurrentLinkedQueue<>());
list.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter, deleteHandler));
}
public void stop() {
......
......@@ -123,11 +123,11 @@ public class MasterSlaveEntry {
}
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
......@@ -440,7 +440,7 @@ public class MasterSlaveEntry {
if (oldMaster != masterEntry) {
writeConnectionPool.remove(masterEntry);
pubSubConnectionPool.remove(masterEntry);
masterEntry.getClient().shutdownAsync();
masterEntry.shutdownAsync();
masterEntry = oldMaster;
}
log.error("Unable to change master from: " + oldMaster.getClient().getAddr() + " to: " + address, e);
......@@ -465,7 +465,7 @@ public class MasterSlaveEntry {
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM);
}
oldMaster.getClient().shutdownAsync();
oldMaster.shutdownAsync();
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr());
});
}
......@@ -478,7 +478,7 @@ public class MasterSlaveEntry {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2);
if (masterEntry != null) {
masterEntry.getClient().shutdownAsync().onComplete(listener);
masterEntry.shutdownAsync().onComplete(listener);
}
slaveBalancer.shutdownAsync().onComplete(listener);
return result;
......
......@@ -294,7 +294,7 @@ public class LoadBalancerManager {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, client2Entry.values().size());
for (ClientConnectionsEntry entry : client2Entry.values()) {
entry.getClient().shutdownAsync().onComplete(listener);
entry.shutdownAsync().onComplete(listener);
}
return result;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册