未验证 提交 f869a95c 编写于 作者: P Potato 提交者: GitHub

[IOTDB-6119] Add ConfigNode leader service check (#10985)

上级 1357cede
...@@ -198,33 +198,7 @@ public class ConfigRegionStateMachine ...@@ -198,33 +198,7 @@ public class ConfigRegionStateMachine
// We get currentNodeId here because the currentNodeId // We get currentNodeId here because the currentNodeId
// couldn't initialize earlier than the ConfigRegionStateMachine // couldn't initialize earlier than the ConfigRegionStateMachine
int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
if (currentNodeId != newLeaderId) {
if (currentNodeId == newLeaderId) {
LOGGER.info(
"Current node [nodeId: {}, ip:port: {}] becomes Leader",
newLeaderId,
currentNodeTEndPoint);
// Always start load services first
configManager.getLoadManager().startLoadServices();
// Start leader scheduling services
configManager.getProcedureManager().shiftExecutor(true);
configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
configManager.getPartitionManager().startRegionCleaner();
// we do cq recovery async for two reasons:
// 1. For performance: cq recovery may be time-consuming, we use another thread to do it in
// make notifyLeaderChanged not blocked by it
// 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
// initialized after notifyLeaderChanged finished
threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
threadPool.submit(
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
threadPool.submit(
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
} else {
LOGGER.info( LOGGER.info(
"Current node [nodeId:{}, ip:port: {}] is not longer the leader, " "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
+ "the new leader is [nodeId:{}]", + "the new leader is [nodeId:{}]",
...@@ -244,6 +218,34 @@ public class ConfigRegionStateMachine ...@@ -244,6 +218,34 @@ public class ConfigRegionStateMachine
} }
} }
@Override
public void notifyLeaderReady() {
LOGGER.info(
"Current node [nodeId: {}, ip:port: {}] becomes Leader",
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
currentNodeTEndPoint);
// Always start load services first
configManager.getLoadManager().startLoadServices();
// Start leader scheduling services
configManager.getProcedureManager().shiftExecutor(true);
configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
configManager.getPartitionManager().startRegionCleaner();
// we do cq recovery async for two reasons:
// 1. For performance: cq recovery may be time-consuming, we use another thread to do it in
// make notifyLeaderChanged not blocked by it
// 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
// initialized after notifyLeaderChanged finished
threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
threadPool.submit(
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
threadPool.submit(
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
}
@Override @Override
public void start() { public void start() {
if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
......
...@@ -336,6 +336,10 @@ public class ConsensusManager { ...@@ -336,6 +336,10 @@ public class ConsensusManager {
return consensusImpl.isLeader(DEFAULT_CONSENSUS_GROUP_ID); return consensusImpl.isLeader(DEFAULT_CONSENSUS_GROUP_ID);
} }
public boolean isLeaderReady() {
return consensusImpl.isLeaderReady(DEFAULT_CONSENSUS_GROUP_ID);
}
/** @return ConfigNode-leader's location if leader exists, null otherwise. */ /** @return ConfigNode-leader's location if leader exists, null otherwise. */
public TConfigNodeLocation getLeader() { public TConfigNodeLocation getLeader() {
for (int retry = 0; retry < 50; retry++) { for (int retry = 0; retry < 50; retry++) {
...@@ -366,25 +370,28 @@ public class ConsensusManager { ...@@ -366,25 +370,28 @@ public class ConsensusManager {
/** /**
* Confirm the current ConfigNode's leadership. * Confirm the current ConfigNode's leadership.
* *
* @return SUCCESS_STATUS if the current ConfigNode is leader, NEED_REDIRECTION otherwise * @return SUCCESS_STATUS if the current ConfigNode is leader and has been ready yet,
* NEED_REDIRECTION otherwise
*/ */
public TSStatus confirmLeader() { public TSStatus confirmLeader() {
TSStatus result = new TSStatus(); TSStatus result = new TSStatus();
if (isLeaderReady()) {
if (isLeader()) { result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else { } else {
result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
result.setMessage( if (isLeader()) {
"The current ConfigNode is not leader, please redirect to a new ConfigNode."); result.setMessage(
"The current ConfigNode is leader but not ready yet, please try again later.");
} else {
result.setMessage(
"The current ConfigNode is not leader, please redirect to a new ConfigNode.");
}
TConfigNodeLocation leaderLocation = getLeader(); TConfigNodeLocation leaderLocation = getLeader();
if (leaderLocation != null) { if (leaderLocation != null) {
result.setRedirectNode(leaderLocation.getInternalEndPoint()); result.setRedirectNode(leaderLocation.getInternalEndPoint());
} }
return result;
} }
return result;
} }
public ConsensusGroupId getConsensusGroupId() { public ConsensusGroupId getConsensusGroupId() {
......
...@@ -114,15 +114,6 @@ public class CQManager { ...@@ -114,15 +114,6 @@ public class CQManager {
} }
public void startCQScheduler() { public void startCQScheduler() {
try {
/*
TODO: remove this after fixing IOTDB-6085
sleep here because IOTDB-6085: NullPointerException in readAsync when Ratis leader is changing
*/
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
// 1. shutdown previous cq schedule thread pool // 1. shutdown previous cq schedule thread pool
......
...@@ -168,6 +168,11 @@ public interface IStateMachine { ...@@ -168,6 +168,11 @@ public interface IStateMachine {
default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) { default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {
// do nothing default // do nothing default
} }
/** Notify the {@link IStateMachine} that this server becomes ready after changed to leader. */
default void notifyLeaderReady() {
// do nothing default
}
} }
/** /**
......
...@@ -41,6 +41,7 @@ public class RatisConfig { ...@@ -41,6 +41,7 @@ public class RatisConfig {
private final Impl impl; private final Impl impl;
private final LeaderLogAppender leaderLogAppender; private final LeaderLogAppender leaderLogAppender;
private final Read read; private final Read read;
private final Utils utils;
private RatisConfig( private RatisConfig(
Rpc rpc, Rpc rpc,
...@@ -52,7 +53,8 @@ public class RatisConfig { ...@@ -52,7 +53,8 @@ public class RatisConfig {
Client client, Client client,
Impl impl, Impl impl,
LeaderLogAppender leaderLogAppender, LeaderLogAppender leaderLogAppender,
Read read) { Read read,
Utils utils) {
this.rpc = rpc; this.rpc = rpc;
this.leaderElection = leaderElection; this.leaderElection = leaderElection;
this.snapshot = snapshot; this.snapshot = snapshot;
...@@ -63,6 +65,7 @@ public class RatisConfig { ...@@ -63,6 +65,7 @@ public class RatisConfig {
this.impl = impl; this.impl = impl;
this.leaderLogAppender = leaderLogAppender; this.leaderLogAppender = leaderLogAppender;
this.read = read; this.read = read;
this.utils = utils;
} }
public Rpc getRpc() { public Rpc getRpc() {
...@@ -105,6 +108,10 @@ public class RatisConfig { ...@@ -105,6 +108,10 @@ public class RatisConfig {
return read; return read;
} }
public Utils getUtils() {
return utils;
}
public static Builder newBuilder() { public static Builder newBuilder() {
return new Builder(); return new Builder();
} }
...@@ -117,11 +124,11 @@ public class RatisConfig { ...@@ -117,11 +124,11 @@ public class RatisConfig {
private ThreadPool threadPool; private ThreadPool threadPool;
private Log log; private Log log;
private Grpc grpc; private Grpc grpc;
private Client client; private Client client;
private Impl impl; private Impl impl;
private LeaderLogAppender leaderLogAppender; private LeaderLogAppender leaderLogAppender;
private Read read; private Read read;
private Utils utils;
public RatisConfig build() { public RatisConfig build() {
return new RatisConfig( return new RatisConfig(
...@@ -135,7 +142,8 @@ public class RatisConfig { ...@@ -135,7 +142,8 @@ public class RatisConfig {
Optional.ofNullable(impl).orElseGet(() -> Impl.newBuilder().build()), Optional.ofNullable(impl).orElseGet(() -> Impl.newBuilder().build()),
Optional.ofNullable(leaderLogAppender) Optional.ofNullable(leaderLogAppender)
.orElseGet(() -> LeaderLogAppender.newBuilder().build()), .orElseGet(() -> LeaderLogAppender.newBuilder().build()),
Optional.ofNullable(read).orElseGet(() -> Read.newBuilder().build())); Optional.ofNullable(read).orElseGet(() -> Read.newBuilder().build()),
Optional.ofNullable(utils).orElseGet(() -> Utils.newBuilder().build()));
} }
public Builder setRpc(Rpc rpc) { public Builder setRpc(Rpc rpc) {
...@@ -187,6 +195,10 @@ public class RatisConfig { ...@@ -187,6 +195,10 @@ public class RatisConfig {
this.read = read; this.read = read;
return this; return this;
} }
public void setUtils(Utils utils) {
this.utils = utils;
}
} }
/** server rpc timeout related. */ /** server rpc timeout related. */
...@@ -1104,4 +1116,34 @@ public class RatisConfig { ...@@ -1104,4 +1116,34 @@ public class RatisConfig {
} }
} }
} }
public static class Utils {
private final int sleepDeviationThresholdMs;
private Utils(int sleepDeviationThresholdMs) {
this.sleepDeviationThresholdMs = sleepDeviationThresholdMs;
}
public int getSleepDeviationThresholdMs() {
return sleepDeviationThresholdMs;
}
public static Utils.Builder newBuilder() {
return new Utils.Builder();
}
public static class Builder {
private int sleepDeviationThresholdMs = 4 * 1000;
public Utils build() {
return new Utils(sleepDeviationThresholdMs);
}
public void setSleepDeviationThresholdMs(int sleepDeviationThresholdMs) {
this.sleepDeviationThresholdMs = sleepDeviationThresholdMs;
}
}
}
} }
...@@ -318,6 +318,11 @@ public class ApplicationStateMachineProxy extends BaseStateMachine { ...@@ -318,6 +318,11 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
Utils.fromRaftPeerIdToNodeId(newLeaderId)); Utils.fromRaftPeerIdToNodeId(newLeaderId));
} }
@Override
public void notifyLeaderReady() {
applicationStateMachine.event().notifyLeaderReady();
}
@Override @Override
public void notifyConfigurationChanged( public void notifyConfigurationChanged(
long term, long index, RaftConfigurationProto newRaftConfiguration) { long term, long index, RaftConfigurationProto newRaftConfiguration) {
......
...@@ -289,5 +289,8 @@ public class Utils { ...@@ -289,5 +289,8 @@ public class Utils {
: RaftServerConfigKeys.Read.Option.LINEARIZABLE; : RaftServerConfigKeys.Read.Option.LINEARIZABLE;
RaftServerConfigKeys.Read.setOption(properties, option); RaftServerConfigKeys.Read.setOption(properties, option);
RaftServerConfigKeys.Read.setTimeout(properties, config.getRead().getReadTimeout()); RaftServerConfigKeys.Read.setTimeout(properties, config.getRead().getReadTimeout());
RaftServerConfigKeys.setSleepDeviationThreshold(
properties, config.getUtils().getSleepDeviationThresholdMs());
} }
} }
...@@ -50,6 +50,7 @@ public class SimpleConsensusServerImpl implements IStateMachine { ...@@ -50,6 +50,7 @@ public class SimpleConsensusServerImpl implements IStateMachine {
stateMachine.start(); stateMachine.start();
// Notify itself as the leader // Notify itself as the leader
stateMachine.event().notifyLeaderChanged(peer.getGroupId(), peer.getNodeId()); stateMachine.event().notifyLeaderChanged(peer.getGroupId(), peer.getNodeId());
stateMachine.event().notifyLeaderReady();
} }
@Override @Override
......
...@@ -307,7 +307,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie ...@@ -307,7 +307,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
configLeader = null; configLeader = null;
} }
logger.warn( logger.warn(
"Failed to connect to ConfigNode {} from DataNode {}, because the current node is not leader, try next node", "Failed to connect to ConfigNode {} from DataNode {}, because the current node is not "
+ "leader or not ready yet, will try again later",
configNode, configNode,
config.getAddressAndPort()); config.getAddressAndPort());
return true; return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册