未验证 提交 656c462a 编写于 作者: L Li Yu Heng 提交者: GitHub

[IOTDB-6129] ConfigNode restarts without relying on Seed-ConfigNode (#10988)

上级 39819a2b
......@@ -74,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -228,7 +229,7 @@ public abstract class AbstractEnv implements BaseEnv {
fail();
}
testWorking();
testWorkingNoUnknown();
}
public String getTestClassName() {
......@@ -242,7 +243,26 @@ public abstract class AbstractEnv implements BaseEnv {
return "UNKNOWN-IT";
}
public void testWorking() {
private Map<String, Integer> countNodeStatus(Map<Integer, String> nodeStatus) {
Map<String, Integer> result = new HashMap<>();
nodeStatus.values().forEach(status -> result.put(status, result.getOrDefault(status, 0) + 1));
return result;
}
public void testWorkingNoUnknown() {
testWorking(nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals));
}
public void testWorkingOneUnknownOtherRunning() {
testWorking(
nodeStatus -> {
Map<String, Integer> count = countNodeStatus(nodeStatus);
return count.getOrDefault("Unknown", 0) == 1
&& count.getOrDefault("Running", 0) == nodeStatus.size() - 1;
});
}
public void testWorking(Predicate<Map<Integer, String>> statusCheck) {
logger.info("Testing DataNode connection...");
List<String> endpoints =
dataNodeWrapperList.stream()
......@@ -271,7 +291,7 @@ public abstract class AbstractEnv implements BaseEnv {
long startTime = System.currentTimeMillis();
testDelegate.requestAll();
if (!configNodeWrapperList.isEmpty()) {
checkNodeHeartbeat();
checkNodeHeartbeat(statusCheck);
}
logger.info("Start cluster costs: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
} catch (Exception e) {
......@@ -280,7 +300,7 @@ public abstract class AbstractEnv implements BaseEnv {
}
}
private void checkNodeHeartbeat() throws Exception {
private void checkNodeHeartbeat(Predicate<Map<Integer, String>> statusCheck) throws Exception {
logger.info("Testing cluster environment...");
TShowClusterResp showClusterResp;
Exception lastException = null;
......@@ -305,12 +325,7 @@ public abstract class AbstractEnv implements BaseEnv {
// Check the status of nodes
if (flag) {
Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
for (String status : nodeStatus.values()) {
if (NodeStatus.Unknown.getStatus().equals(status)) {
flag = false;
break;
}
}
flag = statusCheck.test(nodeStatus);
}
if (flag) {
......@@ -326,6 +341,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (lastException != null) {
throw lastException;
}
throw new Exception("Check not pass");
}
@Override
......@@ -704,7 +720,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (isNeedVerify) {
// Test whether register success
testWorking();
testWorkingNoUnknown();
}
}
......@@ -729,7 +745,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (isNeedVerify) {
// Test whether register success
testWorking();
testWorkingNoUnknown();
}
}
......
......@@ -21,14 +21,12 @@ package org.apache.iotdb.confignode.it.cluster;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
......@@ -194,15 +192,6 @@ public class IoTDBClusterNodeErrorStartUpIT {
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
/* Restart with error cluster name */
TConfigNodeRestartReq configNodeRestartReq =
ConfigNodeTestUtils.generateTConfigNodeRestartReq(
ERROR_CLUSTER_NAME, 1, registeredConfigNodeWrapper);
TSStatus configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
Assert.assertEquals(
TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
Assert.assertTrue(configNodeRestartStatus.getMessage().contains("cluster are inconsistent"));
TDataNodeRestartReq dataNodeRestartReq =
ConfigNodeTestUtils.generateTDataNodeRestartReq(
ERROR_CLUSTER_NAME, 2, registeredDataNodeWrapper);
......@@ -214,15 +203,6 @@ public class IoTDBClusterNodeErrorStartUpIT {
dataNodeRestartResp.getStatus().getMessage().contains("cluster are inconsistent"));
/* Restart with error NodeId */
configNodeRestartReq =
ConfigNodeTestUtils.generateTConfigNodeRestartReq(
TEST_CLUSTER_NAME, 100, registeredConfigNodeWrapper);
configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
Assert.assertEquals(
TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
Assert.assertTrue(configNodeRestartStatus.getMessage().contains("whose nodeId="));
dataNodeRestartReq =
ConfigNodeTestUtils.generateTDataNodeRestartReq(
TEST_CLUSTER_NAME, 200, registeredDataNodeWrapper);
......@@ -256,13 +236,6 @@ public class IoTDBClusterNodeErrorStartUpIT {
Assert.assertNotEquals(-1, registeredConfigNodeId);
int originPort = registeredConfigNodeWrapper.getConsensusPort();
registeredConfigNodeWrapper.setConsensusPort(-12345);
configNodeRestartReq =
ConfigNodeTestUtils.generateTConfigNodeRestartReq(
TEST_CLUSTER_NAME, registeredConfigNodeId, registeredConfigNodeWrapper);
configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
Assert.assertEquals(
TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
Assert.assertTrue(configNodeRestartStatus.getMessage().contains("the internal TEndPoints"));
registeredConfigNodeWrapper.setConsensusPort(originPort);
int registeredDataNodeId = -1;
......
......@@ -32,6 +32,7 @@ import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
......@@ -41,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
......@@ -50,15 +52,17 @@ import java.util.concurrent.TimeUnit;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterRestartIT {
private static final Logger logger = IoTDBTestLogger.logger;
private static final String ratisConsensusProtocolClass =
"org.apache.iotdb.consensus.ratis.RatisConsensus";
private static final int testConfigNodeNum = 2;
private static final int testDataNodeNum = 2;
private static final int testReplicationFactor = 2;
private static final int testConfigNodeNum = 3, testDataNodeNum = 2;
@Before
public void setUp() throws Exception {
public void setUp() {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
......@@ -69,7 +73,6 @@ public class IoTDBClusterRestartIT {
.setSchemaReplicationFactor(testReplicationFactor)
.setDataReplicationFactor(testReplicationFactor);
// Init 2C2D cluster environment
EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
}
......@@ -99,7 +102,7 @@ public class IoTDBClusterRestartIT {
EnvFactory.getEnv().startDataNode(i);
}
((AbstractEnv) EnvFactory.getEnv()).testWorking();
((AbstractEnv) EnvFactory.getEnv()).testWorkingNoUnknown();
}
@Test
......@@ -152,4 +155,20 @@ public class IoTDBClusterRestartIT {
}
// TODO: Add persistence tests in the future
@Test
public void clusterRestartWithoutSeedConfigNode() {
// shutdown all 3 ConfigNodes
for (int i = testConfigNodeNum - 1; i >= 0; i--) {
EnvFactory.getEnv().shutdownConfigNode(i);
}
logger.info("Shutdown all ConfigNode");
// restart without seed ConfigNode, the cluster should still work
for (int i = 1; i < testConfigNodeNum; i++) {
EnvFactory.getEnv().startConfigNode(i);
}
logger.info("Restarted");
((AbstractEnv) EnvFactory.getEnv()).testWorkingOneUnknownOtherRunning();
logger.info("Working without Seed-ConfigNode");
}
}
......@@ -32,7 +32,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
......@@ -336,12 +335,6 @@ public class ConfigNodeTestUtils {
return clusterParameters;
}
public static TConfigNodeRestartReq generateTConfigNodeRestartReq(
String clusterName, int nodeId, ConfigNodeWrapper configNodeWrapper) {
return new TConfigNodeRestartReq(
clusterName, generateTConfigNodeLocation(nodeId, configNodeWrapper));
}
public static TDataNodeLocation generateTDataNodeLocation(
int nodeId, DataNodeWrapper dataNodeWrapper) {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
......
......@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
......@@ -80,8 +79,6 @@ public class SyncConfigNodeClientPool {
case NOTIFY_REGISTER_SUCCESS:
client.notifyRegisterSuccess();
return null;
case RESTART_CONFIG_NODE:
return client.restartConfigNode((TConfigNodeRestartReq) req);
case REMOVE_CONFIG_NODE:
return removeConfigNode((TConfigNodeLocation) req, client);
case DELETE_CONFIG_NODE_PEER:
......
......@@ -401,7 +401,9 @@ public class SystemPropertiesUtils {
private static synchronized void storeSystemProperties(Properties systemProperties)
throws IOException {
try (FileOutputStream fileOutputStream = new FileOutputStream(systemPropertiesFile)) {
systemProperties.store(fileOutputStream, "");
systemProperties.store(
fileOutputStream,
" THIS FILE IS AUTOMATICALLY GENERATED. PLEASE DO NOT MODIFY THIS FILE !!!");
}
}
}
......@@ -105,7 +105,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
......@@ -1011,28 +1010,6 @@ public class ConfigManager implements IManager {
return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
}
@Override
public TSStatus restartConfigNode(TConfigNodeRestartReq req) {
TSStatus status = confirmLeader();
// Notice: The Seed-ConfigNode must also have the privilege to do Node restart check.
// Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| ConfigNodeDescriptor.getInstance().isSeedConfigNode()
|| SystemPropertiesUtils.isSeedConfigNode()) {
status =
ClusterNodeStartUtils.confirmNodeRestart(
NodeType.ConfigNode,
req.getClusterName(),
req.getConfigNodeLocation().getConfigNodeId(),
req.getConfigNodeLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
}
}
return status;
}
public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
final String errorPrefix = "Reject register, please ensure that the parameter ";
final String errorSuffix = " is consistent with the Seed-ConfigNode.";
......
......@@ -51,7 +51,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
......@@ -387,8 +386,6 @@ public interface IManager {
*/
TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req);
TSStatus restartConfigNode(TConfigNodeRestartReq req);
/**
* Create peer in new node to build consensus group.
*
......
......@@ -43,7 +43,6 @@ import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
......@@ -127,17 +126,17 @@ public class ConfigNode implements ConfigNodeMBean {
if (SystemPropertiesUtils.isRestarted()) {
LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
int configNodeId;
if (!SystemPropertiesUtils.isSeedConfigNode()) {
// The non-seed-ConfigNodes should send restart request and be checked (ip and port) by
// leader before initConsensusManager
sendRestartConfigNodeRequest();
configNodeId = CONF.getConfigNodeId();
} else {
configNodeId = SEED_CONFIG_NODE_ID;
}
int configNodeId = CONF.getConfigNodeId();
configManager.initConsensusManager();
while (configManager.getConsensusManager().getLeader() == null) {
LOGGER.info("Leader has not been elected yet, wait for 1 second");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Unexpected interruption during waiting for leader election.");
}
}
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until ConfigNode is fully available
......@@ -363,39 +362,6 @@ public class ConfigNode implements ConfigNodeMBean {
stop();
}
private void sendRestartConfigNodeRequest() throws StartupException {
TConfigNodeRestartReq req =
new TConfigNodeRestartReq(
CONF.getClusterName(), generateConfigNodeLocation(CONF.getConfigNodeId()));
TEndPoint targetConfigNode = CONF.getTargetConfigNode();
if (targetConfigNode == null) {
LOGGER.error(
"Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
throw new StartupException("The targetConfigNode setting in conf is empty");
}
for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
TSStatus status =
(TSStatus)
SyncConfigNodeClientPool.getInstance()
.sendSyncRequestToConfigNodeWithRetry(
targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info("Registration request of current ConfigNode is accepted.");
return;
} else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
targetConfigNode = status.getRedirectNode();
LOGGER.info("ConfigNode need redirect to {}.", targetConfigNode);
} else {
throw new StartupException(status.getMessage());
}
startUpSleep("Register ConfigNode failed! ");
}
}
private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
return new TConfigNodeLocation(
configNodeId,
......
......@@ -74,7 +74,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
......@@ -594,16 +593,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return StatusUtils.OK;
}
@Override
public TSStatus restartConfigNode(TConfigNodeRestartReq req) {
TSStatus status = configManager.restartConfigNode(req);
// Print log to record the ConfigNode that performs the RegisterConfigNodeRequest
LOGGER.info("Execute RestartConfigNodeRequest {} with result {}", req, status);
return status;
}
/** For leader to remove ConfigNode configuration in consensus layer */
@Override
public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
......
......@@ -42,7 +42,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
......@@ -564,11 +563,6 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
}
@Override
public TSStatus restartConfigNode(TConfigNodeRestartReq req) throws TException {
throw new TException("DataNode to ConfigNode client doesn't support restartConfigNode.");
}
@Override
public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) throws TException {
throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
......
......@@ -376,11 +376,6 @@ struct TConfigNodeRegisterResp {
2: optional i32 configNodeId
}
struct TConfigNodeRestartReq {
1: required string clusterName
2: required common.TConfigNodeLocation configNodeLocation
}
struct TAddConsensusGroupReq {
1: required list<common.TConfigNodeLocation> configNodeList
}
......@@ -1026,15 +1021,6 @@ service IConfigNodeRPCService {
/** The ConfigNode-leader will notify the Non-Seed-ConfigNode that the registration success */
common.TSStatus notifyRegisterSuccess()
/**
* Restart an existed ConfigNode
*
* @return SUCCESS_STATUS if ConfigNode restart request is accepted
* REJECT_NODE_START if the configuration chek of the ConfigNode to be restarted fails,
* and a detailed error message will be returned.
*/
common.TSStatus restartConfigNode(TConfigNodeRestartReq req)
/**
* Remove the specific ConfigNode from the cluster
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册