未验证 提交 3a1a26a3 编写于 作者: Y YuFengLiu 提交者: GitHub

add build info in show cluster (#10595)

上级 c0c4ad29
......@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLoca
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateBuildInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
......@@ -280,6 +281,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case RemoveConfigNode:
plan = new RemoveConfigNodePlan();
break;
case UpdateBuildInfo:
plan = new UpdateBuildInfoPlan();
break;
case CreateFunction:
plan = new CreateFunctionPlan();
break;
......
......@@ -27,6 +27,7 @@ public enum ConfigPhysicalPlanType {
/** ConfigNode. */
ApplyConfigNode((short) 0),
RemoveConfigNode((short) 1),
UpdateBuildInfo((short) 2),
/** DataNode. */
RegisterDataNode((short) 100),
......
......@@ -50,7 +50,6 @@ public class ApplyConfigNodePlan extends ConfigPhysicalPlan {
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(getType().getPlanType(), stream);
ThriftConfigNodeSerDeUtils.serializeTConfigNodeLocation(configNodeLocation, stream);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.consensus.request.write.confignode;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
public class UpdateBuildInfoPlan extends ConfigPhysicalPlan {
private int nodeId;
private String buildInfo;
public UpdateBuildInfoPlan() {
super(ConfigPhysicalPlanType.UpdateBuildInfo);
}
public UpdateBuildInfoPlan(String buildInfo, int nodeId) {
this();
this.buildInfo = buildInfo;
this.nodeId = nodeId;
}
public String getBuildInfo() {
return buildInfo;
}
public int getNodeId() {
return nodeId;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(getType().getPlanType(), stream);
ReadWriteIOUtils.write(nodeId, stream);
ReadWriteIOUtils.write(buildInfo, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) {
nodeId = ReadWriteIOUtils.readInt(buffer);
buildInfo = ReadWriteIOUtils.readString(buffer);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!getType().equals(((UpdateBuildInfoPlan) o).getType())) {
return false;
}
UpdateBuildInfoPlan that = (UpdateBuildInfoPlan) o;
return nodeId == that.nodeId && buildInfo.equals(that.buildInfo);
}
@Override
public int hashCode() {
return Objects.hash(nodeId, buildInfo);
}
}
......@@ -61,7 +61,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetDataRepli
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
......@@ -348,11 +347,9 @@ public class ConfigManager implements IManager {
req.getDataNodeConfiguration().getLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return nodeManager.registerDataNode(
new RegisterDataNodePlan(req.getDataNodeConfiguration()));
return nodeManager.registerDataNode(req);
}
}
DataNodeRegisterResp resp = new DataNodeRegisterResp();
resp.setStatus(status);
resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
......@@ -375,7 +372,7 @@ public class ConfigManager implements IManager {
req.getDataNodeConfiguration().getLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return nodeManager.updateDataNodeIfNecessary(req.getDataNodeConfiguration());
return nodeManager.updateDataNodeIfNecessary(req);
}
}
......@@ -447,9 +444,12 @@ public class ConfigManager implements IManager {
.sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
.collect(Collectors.toList());
Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
return new TShowClusterResp(status, configNodeLocations, dataNodeInfoLocations, nodeStatus);
Map<Integer, String> nodeBuildInfo = getNodeManager().getNodeBuildInfo();
return new TShowClusterResp(
status, configNodeLocations, dataNodeInfoLocations, nodeStatus, nodeBuildInfo);
} else {
return new TShowClusterResp(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>());
return new TShowClusterResp(
status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new HashMap<>());
}
}
......@@ -1032,7 +1032,7 @@ public class ConfigManager implements IManager {
req.getConfigNodeLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return nodeManager.restartConfigNode(req.getConfigNodeLocation());
return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
}
}
return status;
......
......@@ -467,7 +467,7 @@ public class ProcedureManager {
*/
public void addConfigNode(TConfigNodeRegisterReq req) {
AddConfigNodeProcedure addConfigNodeProcedure =
new AddConfigNodeProcedure(req.getConfigNodeLocation());
new AddConfigNodeProcedure(req.getConfigNodeLocation(), req.getBuildInfo());
this.executor.submitProcedure(addConfigNodeProcedure);
}
......
......@@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateBuildInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
......@@ -65,6 +66,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
......@@ -73,6 +76,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
......@@ -234,17 +238,24 @@ public class NodeManager {
/**
* Register DataNode.
*
* @param registerDataNodePlan RegisterDataNodeReq
* @param req TDataNodeRegisterReq
* @return DataNodeConfigurationDataSet. The {@link TSStatus} will be set to {@link
* TSStatusCode#SUCCESS_STATUS} when register success.
*/
public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
public DataSet registerDataNode(TDataNodeRegisterReq req) {
int dataNodeId = nodeInfo.generateNextNodeId();
RegisterDataNodePlan registerDataNodePlan =
new RegisterDataNodePlan(req.getDataNodeConfiguration());
// Register new DataNode
registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
getConsensusManager().write(registerDataNodePlan);
// update datanode's buildInfo
UpdateBuildInfoPlan updateBuildInfoPlan =
new UpdateBuildInfoPlan(req.getBuildInfo(), dataNodeId);
getConsensusManager().write(updateBuildInfoPlan);
// Bind DataNode metrics
PartitionMetrics.bindDataNodePartitionMetrics(
MetricService.getInstance(), configManager, dataNodeId);
......@@ -262,15 +273,21 @@ public class NodeManager {
return resp;
}
public TDataNodeRestartResp updateDataNodeIfNecessary(
TDataNodeConfiguration dataNodeConfiguration) {
TDataNodeConfiguration recordConfiguration =
getRegisteredDataNode(dataNodeConfiguration.getLocation().getDataNodeId());
if (!recordConfiguration.equals(dataNodeConfiguration)) {
public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
// Update DataNodeConfiguration when modified during restart
UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(dataNodeConfiguration);
UpdateDataNodePlan updateDataNodePlan =
new UpdateDataNodePlan(req.getDataNodeConfiguration());
getConsensusManager().write(updateDataNodePlan);
}
String recordBuildInfo = nodeInfo.getBuildInfo(nodeId);
if (!req.getBuildInfo().equals(recordBuildInfo)) {
// Update buildInfo when modified during restart
UpdateBuildInfoPlan updateBuildInfoPlan = new UpdateBuildInfoPlan(req.getBuildInfo(), nodeId);
getConsensusManager().write(updateBuildInfoPlan);
}
TDataNodeRestartResp resp = new TDataNodeRestartResp();
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
......@@ -338,8 +355,14 @@ public class NodeManager {
.setConfigNodeId(nodeId);
}
public TSStatus restartConfigNode(TConfigNodeLocation configNodeLocation) {
// TODO: @Itami-Sho, update peer if necessary
public TSStatus updateConfigNodeIfNecessary(int configNodeId, String buildInfo) {
String recordBuildInfo = nodeInfo.getBuildInfo(configNodeId);
if (!recordBuildInfo.equals(buildInfo)) {
// Update buildInfo when modified during restart
UpdateBuildInfoPlan updateConfigNodePlan = new UpdateBuildInfoPlan(buildInfo, configNodeId);
ConsensusWriteResponse result = getConsensusManager().write(updateConfigNodePlan);
return result.getStatus();
}
return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
}
......@@ -461,6 +484,10 @@ public class NodeManager {
return nodeInfo.getRegisteredConfigNodes();
}
public Map<Integer, String> getNodeBuildInfo() {
return nodeInfo.getNodeBuildInfo();
}
public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
......@@ -487,11 +514,15 @@ public class NodeManager {
/**
* Only leader use this interface, record the new ConfigNode's information.
*
* @param configNodeLocation The new ConfigNode
* @param configNodeLocation The new ConfigNode.
* @param buildInfo The new ConfigNode's buildInfo.
*/
public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
public void applyConfigNode(TConfigNodeLocation configNodeLocation, String buildInfo) {
ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
getConsensusManager().write(applyConfigNodePlan);
UpdateBuildInfoPlan updateBuildInfoPlan =
new UpdateBuildInfoPlan(buildInfo, configNodeLocation.getConfigNodeId());
getConsensusManager().write(updateBuildInfoPlan);
}
/**
......
......@@ -51,6 +51,7 @@ import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLoca
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateBuildInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
......@@ -363,6 +364,8 @@ public class ConfigPlanExecutor {
return nodeInfo.applyConfigNode((ApplyConfigNodePlan) physicalPlan);
case RemoveConfigNode:
return nodeInfo.removeConfigNode((RemoveConfigNodePlan) physicalPlan);
case UpdateBuildInfo:
return nodeInfo.updateBuildInfo((UpdateBuildInfoPlan) physicalPlan);
case CreateFunction:
return udfInfo.addUDFInTable((CreateFunctionPlan) physicalPlan);
case DropFunction:
......
......@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateBuildInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
......@@ -86,9 +87,13 @@ public class NodeInfo implements SnapshotProcessor {
// Registered DataNodes
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
private final ReentrantReadWriteLock buildInfoReadWriteLock;
private final AtomicInteger nextNodeId = new AtomicInteger(-1);
private final Map<Integer, TDataNodeConfiguration> registeredDataNodes;
private final Map<Integer, String> nodeBuildInfo;
private static final String SNAPSHOT_FILENAME = "node_info.bin";
public NodeInfo() {
......@@ -97,6 +102,9 @@ public class NodeInfo implements SnapshotProcessor {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.registeredDataNodes = new ConcurrentHashMap<>();
this.nodeBuildInfo = new ConcurrentHashMap<>();
this.buildInfoReadWriteLock = new ReentrantReadWriteLock();
}
/**
......@@ -120,7 +128,6 @@ public class NodeInfo implements SnapshotProcessor {
}
}
registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (nextNodeId.get() < MINIMUM_DATANODE) {
result.setMessage(
......@@ -149,14 +156,17 @@ public class NodeInfo implements SnapshotProcessor {
registeredDataNodes.size());
dataNodeInfoReadWriteLock.writeLock().lock();
buildInfoReadWriteLock.writeLock().lock();
try {
req.getDataNodeLocations()
.forEach(
removeDataNodes -> {
registeredDataNodes.remove(removeDataNodes.getDataNodeId());
nodeBuildInfo.remove(removeDataNodes.getDataNodeId());
LOGGER.info("Removed the datanode {} from cluster", removeDataNodes);
});
} finally {
buildInfoReadWriteLock.writeLock().unlock();
dataNodeInfoReadWriteLock.writeLock().unlock();
}
LOGGER.info(
......@@ -326,8 +336,10 @@ public class NodeInfo implements SnapshotProcessor {
public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
TSStatus status = new TSStatus();
configNodeInfoReadWriteLock.writeLock().lock();
buildInfoReadWriteLock.writeLock().lock();
try {
registeredConfigNodes.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
nodeBuildInfo.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
LOGGER.info(
"Successfully remove ConfigNode: {}. Current ConfigNodeGroup: {}",
......@@ -340,11 +352,29 @@ public class NodeInfo implements SnapshotProcessor {
status.setMessage(
"Remove ConfigNode failed because current ConfigNode can't store ConfigNode information.");
} finally {
buildInfoReadWriteLock.writeLock().unlock();
configNodeInfoReadWriteLock.writeLock().unlock();
}
return status;
}
/**
* Update the specified Node‘s buildInfo.
*
* @param updateBuildInfoPlan UpdateBuildInfoPlan
* @return {@link TSStatusCode#SUCCESS_STATUS} if update build info successfully.
*/
public TSStatus updateBuildInfo(UpdateBuildInfoPlan updateBuildInfoPlan) {
buildInfoReadWriteLock.writeLock().lock();
try {
nodeBuildInfo.put(updateBuildInfoPlan.getNodeId(), updateBuildInfoPlan.getBuildInfo());
} finally {
buildInfoReadWriteLock.writeLock().unlock();
}
LOGGER.info("Successfully update Node {} 's buildInfo.", updateBuildInfoPlan.getNodeId());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/** @return All registered ConfigNodes. */
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
List<TConfigNodeLocation> result;
......@@ -374,6 +404,27 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
/** @return all nodes buildInfo */
public Map<Integer, String> getNodeBuildInfo() {
Map<Integer, String> result = new HashMap<>();
buildInfoReadWriteLock.readLock().lock();
try {
result.putAll(nodeBuildInfo);
} finally {
buildInfoReadWriteLock.readLock().unlock();
}
return result;
}
public String getBuildInfo(int nodeId) {
buildInfoReadWriteLock.readLock().lock();
try {
return nodeBuildInfo.getOrDefault(nodeId, "Unknown");
} finally {
buildInfoReadWriteLock.readLock().unlock();
}
}
public int generateNextNodeId() {
return nextNodeId.incrementAndGet();
}
......@@ -391,6 +442,7 @@ public class NodeInfo implements SnapshotProcessor {
File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
configNodeInfoReadWriteLock.readLock().lock();
dataNodeInfoReadWriteLock.readLock().lock();
buildInfoReadWriteLock.readLock().lock();
try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) {
......@@ -402,6 +454,8 @@ public class NodeInfo implements SnapshotProcessor {
serializeRegisteredDataNode(fileOutputStream, protocol);
serializeBuildInfo(fileOutputStream);
fileOutputStream.flush();
fileOutputStream.close();
......@@ -409,8 +463,9 @@ public class NodeInfo implements SnapshotProcessor {
return tmpFile.renameTo(snapshotFile);
} finally {
configNodeInfoReadWriteLock.readLock().unlock();
buildInfoReadWriteLock.readLock().unlock();
dataNodeInfoReadWriteLock.readLock().unlock();
configNodeInfoReadWriteLock.readLock().unlock();
for (int retry = 0; retry < 5; retry++) {
if (!tmpFile.exists() || tmpFile.delete()) {
break;
......@@ -440,6 +495,14 @@ public class NodeInfo implements SnapshotProcessor {
}
}
private void serializeBuildInfo(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(nodeBuildInfo.size(), outputStream);
for (Entry<Integer, String> entry : nodeBuildInfo.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
}
@Override
public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
......@@ -453,6 +516,7 @@ public class NodeInfo implements SnapshotProcessor {
configNodeInfoReadWriteLock.writeLock().lock();
dataNodeInfoReadWriteLock.writeLock().lock();
buildInfoReadWriteLock.writeLock().lock();
try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
......@@ -466,9 +530,12 @@ public class NodeInfo implements SnapshotProcessor {
deserializeRegisteredDataNode(fileInputStream, protocol);
deserializeBuildInfo(fileInputStream);
} finally {
configNodeInfoReadWriteLock.writeLock().unlock();
buildInfoReadWriteLock.writeLock().unlock();
dataNodeInfoReadWriteLock.writeLock().unlock();
configNodeInfoReadWriteLock.writeLock().unlock();
}
}
......@@ -496,6 +563,20 @@ public class NodeInfo implements SnapshotProcessor {
}
}
private void deserializeBuildInfo(InputStream inputStream) throws IOException {
// old version may not have build info,
// thus we need to check inputStream before deserialize.
if (inputStream.available() != 0) {
int size = ReadWriteIOUtils.readInt(inputStream);
while (size > 0) {
int nodeId = ReadWriteIOUtils.readInt(inputStream);
String buildInfo = ReadWriteIOUtils.readString(inputStream);
nodeBuildInfo.put(nodeId, buildInfo);
size--;
}
}
}
public static int getMinimumDataNode() {
return MINIMUM_DATANODE;
}
......@@ -504,6 +585,7 @@ public class NodeInfo implements SnapshotProcessor {
nextNodeId.set(-1);
registeredDataNodes.clear();
registeredConfigNodes.clear();
nodeBuildInfo.clear();
}
@Override
......@@ -517,11 +599,12 @@ public class NodeInfo implements SnapshotProcessor {
NodeInfo nodeInfo = (NodeInfo) o;
return registeredConfigNodes.equals(nodeInfo.registeredConfigNodes)
&& nextNodeId.get() == nodeInfo.nextNodeId.get()
&& registeredDataNodes.equals(nodeInfo.registeredDataNodes);
&& registeredDataNodes.equals(nodeInfo.registeredDataNodes)
&& nodeBuildInfo.equals(nodeInfo.nodeBuildInfo);
}
@Override
public int hashCode() {
return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes);
return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes, nodeBuildInfo);
}
}
......@@ -324,11 +324,18 @@ public class ConfigNodeProcedureEnv {
* Leader will record the new ConfigNode's information.
*
* @param configNodeLocation The new ConfigNode
* @param buildInfo The new ConfigNode's buildInfo
*/
public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
configManager.getNodeManager().applyConfigNode(configNodeLocation);
public void applyConfigNode(TConfigNodeLocation configNodeLocation, String buildInfo) {
configManager.getNodeManager().applyConfigNode(configNodeLocation, buildInfo);
}
/**
* Leader will record the new Confignode's information.
*
* @param dataNodeConfiguration The new DataNode
*/
/**
* Leader will notify the new ConfigNode that registration success.
*
......
......@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.AddConfigNodeState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -42,13 +43,16 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
private TConfigNodeLocation tConfigNodeLocation;
private String buildInfo;
public AddConfigNodeProcedure() {
super();
}
public AddConfigNodeProcedure(TConfigNodeLocation tConfigNodeLocation) {
public AddConfigNodeProcedure(TConfigNodeLocation tConfigNodeLocation, String buildInfo) {
super();
this.tConfigNodeLocation = tConfigNodeLocation;
this.buildInfo = buildInfo;
}
@Override
......@@ -75,7 +79,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
break;
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
env.applyConfigNode(tConfigNodeLocation);
env.applyConfigNode(tConfigNodeLocation, buildInfo);
env.broadCastTheLatestConfigNodeGroup();
LOG.info("The ConfigNode: {} is successfully added to the cluster", tConfigNodeLocation);
return Flow.NO_MORE_STATE;
......@@ -145,6 +149,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
stream.writeShort(ProcedureType.ADD_CONFIG_NODE_PROCEDURE.getTypeCode());
super.serialize(stream);
ThriftConfigNodeSerDeUtils.serializeTConfigNodeLocation(tConfigNodeLocation, stream);
ReadWriteIOUtils.write(buildInfo, stream);
}
@Override
......@@ -152,6 +157,11 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
super.deserialize(byteBuffer);
try {
tConfigNodeLocation = ThriftConfigNodeSerDeUtils.deserializeTConfigNodeLocation(byteBuffer);
// old version may not have build info,
// thus we need to check inputStream before deserialize.
if (byteBuffer.hasRemaining()) {
buildInfo = ReadWriteIOUtils.readString(byteBuffer);
}
} catch (ThriftSerDeException e) {
LOG.error("Error in deserialize AddConfigNodeProcedure", e);
}
......@@ -163,13 +173,14 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
AddConfigNodeProcedure thatProc = (AddConfigNodeProcedure) that;
return thatProc.getProcId() == this.getProcId()
&& thatProc.getState() == this.getState()
&& thatProc.tConfigNodeLocation.equals(this.tConfigNodeLocation);
&& thatProc.tConfigNodeLocation.equals(this.tConfigNodeLocation)
&& thatProc.buildInfo.equals(this.buildInfo);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(this.tConfigNodeLocation);
return Objects.hash(this.tConfigNodeLocation, this.buildInfo);
}
}
......@@ -122,12 +122,17 @@ public class ConfigNode implements ConfigNodeMBean {
if (SystemPropertiesUtils.isRestarted()) {
LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
int configNodeId;
if (!SystemPropertiesUtils.isSeedConfigNode()) {
// The non-seed-ConfigNodes should send restart request
// The non-seed-ConfigNodes should send restart request and be checked (ip and port) by
// leader before initConsensusManager
sendRestartConfigNodeRequest();
configNodeId = CONF.getConfigNodeId();
} else {
configNodeId = SEED_CONFIG_NODE_ID;
}
configManager.initConsensusManager();
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until ConfigNode is fully available
......@@ -137,6 +142,20 @@ public class ConfigNode implements ConfigNodeMBean {
"{} has successfully restarted and joined the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getClusterName());
// Update item during restart
// This will always be executed until the consensus write succeeds
while (true) {
TSStatus status =
configManager
.getNodeManager()
.updateConfigNodeIfNecessary(configNodeId, IoTDBConstant.BUILD_INFO);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
break;
} else {
startUpSleep("restart ConfigNode failed! ");
}
}
return;
}
......@@ -158,10 +177,7 @@ public class ConfigNode implements ConfigNodeMBean {
configManager
.getNodeManager()
.applyConfigNode(
new TConfigNodeLocation(
SEED_CONFIG_NODE_ID,
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
generateConfigNodeLocation(SEED_CONFIG_NODE_ID), IoTDBConstant.BUILD_INFO);
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
// that the external service is not provided until Seed-ConfigNode is fully initialized
......@@ -200,13 +216,7 @@ public class ConfigNode implements ConfigNodeMBean {
isJoinedCluster = true;
break;
}
try {
TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
LOGGER.warn("Waiting leader's scheduling is interrupted.");
Thread.currentThread().interrupt();
}
startUpSleep("Waiting leader's scheduling is interrupted.");
}
if (!isJoinedCluster) {
......@@ -294,10 +304,9 @@ public class ConfigNode implements ConfigNodeMBean {
TConfigNodeRegisterReq req =
new TConfigNodeRegisterReq(
configManager.getClusterParameters(),
new TConfigNodeLocation(
INIT_NON_SEED_CONFIG_NODE_ID,
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
req.setBuildInfo(IoTDBConstant.BUILD_INFO);
TEndPoint targetConfigNode = CONF.getTargetConfigNode();
if (targetConfigNode == null) {
......@@ -336,13 +345,7 @@ public class ConfigNode implements ConfigNodeMBean {
} else {
throw new StartupException(status.getMessage());
}
try {
TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StartupException("Register ConfigNode failed!");
}
startUpSleep("Register ConfigNode failed!");
}
LOGGER.error(
......@@ -351,13 +354,10 @@ public class ConfigNode implements ConfigNodeMBean {
}
private void sendRestartConfigNodeRequest() throws StartupException {
TConfigNodeRestartReq req =
new TConfigNodeRestartReq(
CONF.getClusterName(),
new TConfigNodeLocation(
CONF.getConfigNodeId(),
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
CONF.getClusterName(), generateConfigNodeLocation(CONF.getConfigNodeId()));
TEndPoint targetConfigNode = CONF.getTargetConfigNode();
if (targetConfigNode == null) {
......@@ -382,13 +382,23 @@ public class ConfigNode implements ConfigNodeMBean {
} else {
throw new StartupException(status.getMessage());
}
startUpSleep("Register ConfigNode failed! ");
}
}
try {
TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StartupException("Register ConfigNode failed! ");
}
private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
return new TConfigNodeLocation(
configNodeId,
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
}
private void startUpSleep(String errorMessage) throws StartupException {
try {
TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StartupException(errorMessage);
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
......@@ -37,7 +38,8 @@ public class AddConfigNodeProcedureTest {
AddConfigNodeProcedure procedure0 =
new AddConfigNodeProcedure(
new TConfigNodeLocation(
0, new TEndPoint("127.0.0.1", 10710), new TEndPoint("0.0.0.0", 10720)));
0, new TEndPoint("127.0.0.1", 10710), new TEndPoint("0.0.0.0", 10720)),
IoTDBConstant.BUILD_INFO);
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
......
......@@ -329,7 +329,8 @@ public class ColumnHeaderConstant {
new ColumnHeader(NODE_TYPE, TSDataType.TEXT),
new ColumnHeader(STATUS, TSDataType.TEXT),
new ColumnHeader(INTERNAL_ADDRESS, TSDataType.TEXT),
new ColumnHeader(INTERNAL_PORT, TSDataType.INT32));
new ColumnHeader(INTERNAL_PORT, TSDataType.INT32),
new ColumnHeader(BUILD_INFO, TSDataType.TEXT));
public static final List<ColumnHeader> showClusterDetailsColumnHeaders =
ImmutableList.of(
......
......@@ -62,13 +62,15 @@ public class ShowClusterTask implements IConfigTask {
String nodeType,
String nodeStatus,
String hostAddress,
int port) {
int port,
String buildInfo) {
builder.getTimeColumnBuilder().writeLong(0L);
builder.getColumnBuilder(0).writeInt(nodeId);
builder.getColumnBuilder(1).writeBinary(new Binary(nodeType));
builder.getColumnBuilder(2).writeBinary(new Binary(nodeStatus));
builder.getColumnBuilder(3).writeBinary(new Binary(hostAddress));
builder.getColumnBuilder(4).writeInt(port);
builder.getColumnBuilder(5).writeBinary(new Binary(buildInfo));
builder.declarePosition();
}
......@@ -90,7 +92,8 @@ public class ShowClusterTask implements IConfigTask {
NODE_TYPE_CONFIG_NODE,
clusterNodeInfos.getNodeStatus().get(e.getConfigNodeId()),
e.getInternalEndPoint().getIp(),
e.getInternalEndPoint().getPort()));
e.getInternalEndPoint().getPort(),
clusterNodeInfos.getNodeBuildInfo().get(e.getConfigNodeId())));
clusterNodeInfos
.getDataNodeList()
......@@ -102,7 +105,8 @@ public class ShowClusterTask implements IConfigTask {
NODE_TYPE_DATA_NODE,
clusterNodeInfos.getNodeStatus().get(e.getDataNodeId()),
e.getInternalEndPoint().getIp(),
e.getInternalEndPoint().getPort()));
e.getInternalEndPoint().getPort(),
clusterNodeInfos.getNodeBuildInfo().get(e.getDataNodeId())));
DatasetHeader datasetHeader = DatasetHeaderFactory.getShowClusterHeader();
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
......
......@@ -361,6 +361,7 @@ public class DataNode implements DataNodeMBean {
TDataNodeRegisterReq req = new TDataNodeRegisterReq();
req.setDataNodeConfiguration(generateDataNodeConfiguration());
req.setClusterName(config.getClusterName());
req.setBuildInfo(IoTDBConstant.BUILD_INFO);
TDataNodeRegisterResp dataNodeRegisterResp = null;
while (retry > 0) {
try (ConfigNodeClient configNodeClient =
......@@ -420,6 +421,7 @@ public class DataNode implements DataNodeMBean {
req.setClusterName(
config.getClusterName() == null ? DEFAULT_CLUSTER_NAME : config.getClusterName());
req.setDataNodeConfiguration(generateDataNodeConfiguration());
req.setBuildInfo(IoTDBConstant.BUILD_INFO);
TDataNodeRestartResp dataNodeRestartResp = null;
while (retry > 0) {
try (ConfigNodeClient configNodeClient =
......
......@@ -102,6 +102,7 @@ struct TRuntimeConfiguration {
struct TDataNodeRegisterReq {
1: required string clusterName
2: required common.TDataNodeConfiguration dataNodeConfiguration
3: optional string buildInfo = "Unknown"
}
struct TDataNodeRegisterResp {
......@@ -114,6 +115,7 @@ struct TDataNodeRegisterResp {
struct TDataNodeRestartReq {
1: required string clusterName
2: required common.TDataNodeConfiguration dataNodeConfiguration
3: optional string buildInfo = "Unknown"
}
struct TDataNodeRestartResp {
......@@ -366,6 +368,7 @@ struct TConfigNodeRegisterReq {
// fields are consistent with the Seed-ConfigNode
1: required TClusterParameters clusterParameters
2: required common.TConfigNodeLocation configNodeLocation
3: optional string buildInfo = "Unknown"
}
struct TConfigNodeRegisterResp {
......@@ -483,6 +486,7 @@ struct TShowClusterResp {
2: required list<common.TConfigNodeLocation> configNodeList
3: required list<common.TDataNodeLocation> dataNodeList
4: required map<i32, string> nodeStatus
5: required map<i32, string> nodeBuildInfo
}
struct TShowVariablesResp {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册