提交 3ed64caf 编写于 作者: N Nikita Koksharov

Fixed - Topic channels connected to master node aren't resubscribed. #2825

上级 80728363
......@@ -570,8 +570,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry);
if (oldEntry != entry) {
entry.incReference();
shutdownEntry(oldEntry);
}
shutdownEntry(oldEntry);
client2entry.put(entry.getClient(), entry);
}
......@@ -583,6 +583,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private void shutdownEntry(MasterSlaveEntry entry) {
if (entry != null && entry.decReference() == 0) {
client2entry.remove(entry.getClient());
entry.getAllEntries().forEach(e -> entry.nodeDown(e));
entry.masterDown();
entry.shutdownAsync();
String slaves = entry.getAllEntries().stream()
.filter(e -> !e.getClient().getAddr().equals(entry.getClient().getAddr()))
......
......@@ -179,19 +179,27 @@ public class MasterSlaveEntry {
return slaveDown(entry);
}
private boolean slaveDown(ClientConnectionsEntry entry) {
if (entry.isMasterForRead()) {
return false;
}
// add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) {
log.info("master {} used as slave", masterEntry.getClient().getAddr());
}
}
return nodeDown(entry);
}
public void masterDown() {
nodeDown(masterEntry);
}
public boolean nodeDown(ClientConnectionsEntry entry) {
entry.reset();
for (RedisConnection connection : entry.getAllConnections()) {
......@@ -205,7 +213,7 @@ public class MasterSlaveEntry {
}
}
entry.getAllConnections().clear();
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
connection.closeAsync();
connectionManager.getSubscribeService().reattachPubSub(connection);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册