提交 3e7e35b4 编写于 作者: N Nikita Koksharov

Feature - RLock can't be acquired anymore if pubsub connection limit was...

Feature - RLock can't be acquired anymore if pubsub connection limit was reached. PubSub Lock entries memory-leak during Lock acquisition. #3577 (thanks to @zhwq1216)
上级 ac83c953
......@@ -31,7 +31,6 @@ import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.*;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -519,12 +518,12 @@ public class MasterSlaveEntry {
return slaveBalancer.nextPubSubConnection();
}
public void returnPubSubConnection(PubSubConnectionEntry entry) {
public void returnPubSubConnection(RedisPubSubConnection connection) {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
pubSubConnectionPool.returnConnection(masterEntry, entry.getConnection());
pubSubConnectionPool.returnConnection(masterEntry, connection);
return;
}
slaveBalancer.returnPubSubConnection(entry.getConnection());
slaveBalancer.returnPubSubConnection(connection);
}
public void releaseWrite(RedisConnection connection) {
......
......@@ -247,12 +247,12 @@ public class PublishSubscribeService {
return;
}
MasterSlaveEntry msEntry = Optional.of(connectionManager.getEntry(entry.getClient()))
.filter(e -> e != entry).orElse(entry);
MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry);
PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry());
PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
if (freeEntry == null) {
freePubSubLock.release();
connect(codec, channelName, msEntry, promise, type, lock, listeners);
return;
}
......@@ -355,51 +355,52 @@ public class PublishSubscribeService {
});
connFuture.onComplete((conn, ex) -> {
if (ex != null) {
freePubSubLock.release();
lock.release();
promise.tryFailure(ex);
return;
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
int remainFreeAmount = entry.tryAcquire();
freePubSubLock.acquire(() -> {
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
int remainFreeAmount = entry.tryAcquire();
PubSubKey key = new PubSubKey(channelName, msEntry);
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
if (oldEntry != null) {
msEntry.returnPubSubConnection(entry);
PubSubKey key = new PubSubKey(channelName, msEntry);
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
if (oldEntry != null) {
msEntry.returnPubSubConnection(conn);
freePubSubLock.release();
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount > 0) {
addFreeConnectionEntry(channelName, entry);
}
freePubSubLock.release();
if (remainFreeAmount > 0) {
addFreeConnectionEntry(channelName, entry);
}
freePubSubLock.release();
RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = entry.psubscribe(codec, channelName);
} else {
future = entry.subscribe(codec, channelName);
}
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = entry.psubscribe(codec, channelName);
} else {
future = entry.subscribe(codec, channelName);
}
future.addListener((ChannelFutureListener) future1 -> {
if (!future1.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
future.addListener((ChannelFutureListener) future1 -> {
if (!future1.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
return;
}
connectionManager.newTimeout(timeout ->
subscribeFuture.cancel(false),
config.getTimeout(), TimeUnit.MILLISECONDS);
connectionManager.newTimeout(timeout ->
subscribeFuture.cancel(false),
config.getTimeout(), TimeUnit.MILLISECONDS);
});
});
});
}
......@@ -420,7 +421,8 @@ public class PublishSubscribeService {
executed.set(true);
if (entry.release() == 1) {
addFreeConnectionEntry(channelName, entry);
MasterSlaveEntry msEntry = getEntry(channelName);
msEntry.returnPubSubConnection(entry.getConnection());
}
result.trySuccess(null);
......
......@@ -8,8 +8,9 @@ import org.redisson.client.WriteRedisConnectionException;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
......@@ -49,6 +50,67 @@ public class RedissonLockTest extends BaseConcurrentTest {
}
}
public static class LockThread implements Runnable {
AtomicBoolean hasFails;
RedissonClient redissonClient;
String lockName;
public LockThread(AtomicBoolean hasFails, RedissonClient redissonClient, String lockName) {
this.hasFails = hasFails;
this.redissonClient = redissonClient;
this.lockName = lockName;
}
@Override
public void run() {
RLock lock = redissonClient.getLock(lockName);
try {
boolean bLocked = lock.tryLock(100, -1, TimeUnit.MILLISECONDS);
if (bLocked) {
lock.unlock();
} else {
hasFails.set(true);
}
} catch (Exception ex) {
hasFails.set(true);
}
}
}
@Test
public void testSinglePubSub() throws IOException, InterruptedException, ExecutionException {
RedisRunner.RedisProcess runner = new RedisRunner()
.port(RedisRunner.findFreePort())
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer()
.setAddress(runner.getRedisServerAddressAndPort())
.setSubscriptionConnectionPoolSize(1)
.setSubscriptionsPerConnection(1);
ExecutorService executorService = Executors.newFixedThreadPool(4);
RedissonClient redissonClient = Redisson.create(config);
AtomicBoolean hasFails = new AtomicBoolean();
for (int i = 0; i < 2; i++) {
Future<?> f1 = executorService.submit(new LockThread(hasFails, redissonClient, "Lock1_" + i));
Future<?> f2 = executorService.submit(new LockThread(hasFails, redissonClient, "Lock1_" + i));
Future<?> f3 = executorService.submit(new LockThread(hasFails, redissonClient, "Lock2_" + i));
Future<?> f4 = executorService.submit(new LockThread(hasFails, redissonClient, "Lock2_" + i));
f1.get();
f2.get();
f3.get();
f4.get();
}
assertThat(hasFails).isFalse();
redissonClient.shutdown();
runner.stop();
}
@Test
public void testRedisFailed() {
Assertions.assertThrows(WriteRedisConnectionException.class, () -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册