DataNodeRemoveHandler.java 26.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.confignode.procedure.env;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
27
import org.apache.iotdb.commons.cluster.NodeStatus;
28
import org.apache.iotdb.commons.service.metric.MetricService;
29
import org.apache.iotdb.commons.utils.NodeUrlUtils;
30
import org.apache.iotdb.confignode.client.DataNodeRequestType;
31
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
32 33
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
34 35
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
36
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
37
import org.apache.iotdb.confignode.manager.ConfigManager;
38
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
39
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
40
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
41
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
42
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
43
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
44 45 46 47 48 49 50
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
O
OneSizeFitQuorum 已提交
51
import java.util.Comparator;
52 53 54 55
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

56
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
57
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
58
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
59
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
60

61
public class DataNodeRemoveHandler {
O
OneSizeFitQuorum 已提交
62

63 64
  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class);

65
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
66

67
  private final ConfigManager configManager;
68 69 70 71 72 73 74 75

  /** region migrate lock */
  private final LockQueue regionMigrateLock = new LockQueue();

  public DataNodeRemoveHandler(ConfigManager configManager) {
    this.configManager = configManager;
  }

76 77 78 79 80 81
  public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
    return String.format(
        "[dataNodeId: %s, clientRpcEndPoint: %s]",
        location.getDataNodeId(), location.getClientRpcEndPoint());
  }

82 83 84
  /**
   * Get all consensus group id in this node
   *
85 86
   * @param removedDataNode the DataNode to be removed
   * @return group id list to be migrated
87
   */
88
  public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) {
89 90
    return configManager.getPartitionManager().getAllReplicaSets().stream()
        .filter(
91 92
            replicaSet ->
                replicaSet.getDataNodeLocations().contains(removedDataNode)
93
                    && replicaSet.regionId.getType() != TConsensusGroupType.ConfigRegion)
94 95 96 97 98 99 100 101 102 103
        .map(TRegionReplicaSet::getRegionId)
        .collect(Collectors.toList());
  }

  /**
   * broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
   * request
   *
   * @param disabledDataNode TDataNodeLocation
   */
104
  public void broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
105
    LOGGER.info(
106 107 108 109
        "DataNodeRemoveService start broadcastDisableDataNode to cluster, disabledDataNode: {}",
        getIdWithRpcEndpoint(disabledDataNode));

    List<TDataNodeConfiguration> otherOnlineDataNodes =
110
        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
111
            .filter(node -> !node.getLocation().equals(disabledDataNode))
112 113
            .collect(Collectors.toList());

114
    for (TDataNodeConfiguration node : otherOnlineDataNodes) {
115
      TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
116
      TSStatus status =
117 118
          SyncDataNodeClientPool.getInstance()
              .sendSyncRequestToDataNodeWithRetry(
119 120 121
                  node.getLocation().getInternalEndPoint(),
                  disableReq,
                  DataNodeRequestType.DISABLE_DATA_NODE);
122
      if (!isSucceed(status)) {
123
        LOGGER.error(
124 125
            "{}, BroadcastDisableDataNode meets error, disabledDataNode: {}, error: {}",
            REMOVE_DATANODE_PROCESS,
126 127 128
            getIdWithRpcEndpoint(disabledDataNode),
            status);
        return;
129 130
      }
    }
131

132
    LOGGER.info(
133 134
        "{}, DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}",
        REMOVE_DATANODE_PROCESS,
135
        getIdWithRpcEndpoint(disabledDataNode));
136 137 138
  }

  /**
139
   * Find dest data node.
140 141 142 143 144 145
   *
   * @param regionId region id
   * @return dest data node location
   */
  public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
    TSStatus status;
146
    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
147
    if (regionReplicaNodes.isEmpty()) {
148
      LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
149
      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
150
      status.setMessage("Cannot find region replica nodes, region: " + regionId);
151 152 153 154 155 156
      return null;
    }

    Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
    if (!newNode.isPresent()) {
      LOGGER.warn("No enough Data node to migrate region: {}", regionId);
157
      return null;
158 159 160 161
    }
    return newNode.get();
  }

162 163 164 165 166 167 168 169 170 171 172 173 174
  /**
   * Create a new RegionReplica and build the ConsensusGroup on the destined DataNode
   *
   * <p>createNewRegionPeer should be invoked on a DataNode that doesn't contain any peer of the
   * specific ConsensusGroup, in order to avoid there exists one DataNode who has more than one
   * RegionReplica.
   *
   * @param regionId The given ConsensusGroup
   * @param destDataNode The destined DataNode where the new peer will be created
   * @return status
   */
  public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
    TSStatus status;
175
    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
176 177
    if (regionReplicaNodes.isEmpty()) {
      LOGGER.warn(
178
          "{}, Cannot find region replica nodes in createPeer, regionId: {}",
179
          REGION_MIGRATE_PROCESS,
180 181 182 183 184 185
          regionId);
      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
      status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
      return status;
    }

186 187
    List<TDataNodeLocation> currentPeerNodes;
    if (TConsensusGroupType.DataRegion.equals(regionId.getType())
188
        && IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
189 190 191 192 193 194 195 196
      // parameter of createPeer for MultiLeader should be all peers
      currentPeerNodes = new ArrayList<>(regionReplicaNodes);
      currentPeerNodes.add(destDataNode);
    } else {
      // parameter of createPeer for Ratis can be empty
      currentPeerNodes = Collections.emptyList();
    }

197 198 199 200 201 202 203 204 205 206 207 208 209 210
    String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
    TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
    // TODO replace with real ttl
    req.setTtl(Long.MAX_VALUE);

    status =
        SyncDataNodeClientPool.getInstance()
            .sendSyncRequestToDataNodeWithRetry(
                destDataNode.getInternalEndPoint(),
                req,
                DataNodeRequestType.CREATE_NEW_REGION_PEER);

    LOGGER.info(
        "{}, Send action createNewRegionPeer finished, regionId: {}, newPeerDataNodeId: {}",
211
        REGION_MIGRATE_PROCESS,
212 213 214 215 216
        regionId,
        getIdWithRpcEndpoint(destDataNode));
    if (isFailed(status)) {
      LOGGER.error(
          "{}, Send action createNewRegionPeer error, regionId: {}, newPeerDataNodeId: {}, result: {}",
217
          REGION_MIGRATE_PROCESS,
218 219 220 221 222 223 224
          regionId,
          getIdWithRpcEndpoint(destDataNode),
          status);
    }
    return status;
  }

225
  /**
226
   * Order the specific ConsensusGroup to add peer for the new RegionReplica.
227
   *
228 229 230 231
   * <p>The add peer interface could be invoked at any DataNode who contains one of the
   * RegionReplica of the specified ConsensusGroup except the new one
   *
   * @param destDataNode The DataNodeLocation where the new RegionReplica is created
232
   * @param regionId region id
233
   * @return TSStatus
234
   */
235
  public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId regionId) {
236
    TSStatus status;
237 238 239 240 241 242 243

    // Here we pick the DataNode who contains one of the RegionReplica of the specified
    // ConsensusGroup except the new one
    // in order to notify the origin ConsensusGroup that another peer is created and demand to join
    Optional<TDataNodeLocation> selectedDataNode =
        filterDataNodeWithOtherRegionReplica(regionId, destDataNode);
    if (!selectedDataNode.isPresent()) {
244
      LOGGER.warn(
245 246
          "{}, There are no other DataNodes could be selected to perform the add peer process, "
              + "please check RegionGroup: {} by show regions sql command",
247
          REGION_MIGRATE_PROCESS,
248
          regionId);
249
      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
250
      status.setMessage(
251
          "There are no other DataNodes could be selected to perform the add peer process, "
252
              + "please check by show regions sql command");
253 254 255
      return status;
    }

256 257
    // Send addRegionPeer request to the selected DataNode,
    // destDataNode is where the new RegionReplica is created
258
    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode);
259 260 261
    status =
        SyncDataNodeClientPool.getInstance()
            .sendSyncRequestToDataNodeWithRetry(
262 263
                selectedDataNode.get().getInternalEndPoint(),
                maintainPeerReq,
264 265
                DataNodeRequestType.ADD_REGION_PEER);
    LOGGER.info(
266
        "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {},  destDataNode: {}",
267
        REGION_MIGRATE_PROCESS,
268
        regionId,
269
        getIdWithRpcEndpoint(selectedDataNode.get()),
270
        getIdWithRpcEndpoint(destDataNode));
271 272 273 274
    return status;
  }

  /**
275
   * Order the specific ConsensusGroup to remove peer for the old RegionReplica.
276
   *
277 278 279 280
   * <p>The remove peer interface could be invoked at any DataNode who contains one of the
   * RegionReplica of the specified ConsensusGroup except the origin one
   *
   * @param originalDataNode The DataNodeLocation who contains the original RegionReplica
281
   * @param regionId region id
282
   * @return TSStatus
283
   */
284 285 286 287
  public TSStatus removeRegionPeer(
      TDataNodeLocation originalDataNode,
      TDataNodeLocation destDataNode,
      TConsensusGroupId regionId) {
288
    TSStatus status;
289

290
    TDataNodeLocation rpcClientDataNode;
291

292 293 294
    // Here we pick the DataNode who contains one of the RegionReplica of the specified
    // ConsensusGroup except the origin one
    // in order to notify the new ConsensusGroup that the origin peer should secede now
295
    // If the selectedDataNode equals null, we choose the destDataNode to execute the method
296 297
    Optional<TDataNodeLocation> selectedDataNode =
        filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
298
    rpcClientDataNode = selectedDataNode.orElse(destDataNode);
299

300
    // Send removeRegionPeer request to the rpcClientDataNode
301
    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
302 303 304
    status =
        SyncDataNodeClientPool.getInstance()
            .sendSyncRequestToDataNodeWithRetry(
305
                rpcClientDataNode.getInternalEndPoint(),
306
                maintainPeerReq,
307 308
                DataNodeRequestType.REMOVE_REGION_PEER);
    LOGGER.info(
309
        "{}, Send action removeRegionPeer finished, regionId: {}, rpcDataNode: {}",
310
        REGION_MIGRATE_PROCESS,
311
        regionId,
312
        getIdWithRpcEndpoint(rpcClientDataNode));
313 314 315 316
    return status;
  }

  /**
317
   * Delete a Region peer in the given ConsensusGroup and all of its data on the specified DataNode
318
   *
319 320 321
   * <p>If the originalDataNode is down, we should delete local data and do other cleanup works
   * manually.
   *
322
   * @param originalDataNode The DataNodeLocation who contains the original RegionReplica
323
   * @param regionId region id
324
   * @return TSStatus
325
   */
326 327
  public TSStatus deleteOldRegionPeer(
      TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
328

329
    TSStatus status;
330
    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
331

332
    status =
333
        configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
334 335 336 337 338 339 340 341 342 343 344 345
                == NodeStatus.Unknown
            ? SyncDataNodeClientPool.getInstance()
                .sendSyncRequestToDataNodeWithGivenRetry(
                    originalDataNode.getInternalEndPoint(),
                    maintainPeerReq,
                    DataNodeRequestType.DELETE_OLD_REGION_PEER,
                    1)
            : SyncDataNodeClientPool.getInstance()
                .sendSyncRequestToDataNodeWithRetry(
                    originalDataNode.getInternalEndPoint(),
                    maintainPeerReq,
                    DataNodeRequestType.DELETE_OLD_REGION_PEER);
346
    LOGGER.info(
347
        "{}, Send action deleteOldRegionPeer finished, regionId: {}, dataNodeId: {}",
348
        REGION_MIGRATE_PROCESS,
349 350
        regionId,
        originalDataNode.getInternalEndPoint());
351 352 353 354 355 356 357 358 359 360 361 362 363 364
    return status;
  }

  /**
   * Update region location cache
   *
   * @param regionId region id
   * @param originalDataNode old location data node
   * @param destDataNode dest data node
   */
  public void updateRegionLocationCache(
      TConsensusGroupId regionId,
      TDataNodeLocation originalDataNode,
      TDataNodeLocation destDataNode) {
365
    LOGGER.info(
366
        "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed",
367
        regionId,
368 369
        getIdWithRpcEndpoint(originalDataNode),
        getIdWithRpcEndpoint(destDataNode));
370 371 372 373
    UpdateRegionLocationPlan req =
        new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
    TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
    LOGGER.info(
374
        "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}",
375 376
        regionId,
        status,
377 378 379
        getIdWithRpcEndpoint(originalDataNode),
        getIdWithRpcEndpoint(destDataNode));

380 381 382
    // Remove the RegionGroupCache of the regionId
    configManager.getLoadManager().removeRegionGroupCache(regionId);

383 384
    // Broadcast the latest RegionRouteMap when Region migration finished
    configManager.getLoadManager().broadcastLatestRegionRouteMap();
385 386 387
  }

  /**
388
   * Find all DataNodes which contains the given regionId
389 390
   *
   * @param regionId region id
391
   * @return DataNode locations
392
   */
393 394
  public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
    Optional<TRegionReplicaSet> regionReplicaSet =
395 396
        configManager.getPartitionManager().getAllReplicaSets().stream()
            .filter(rg -> rg.regionId.equals(regionId))
397 398 399
            .findAny();
    if (regionReplicaSet.isPresent()) {
      return regionReplicaSet.get().getDataNodeLocations();
400 401
    }

402
    return Collections.emptyList();
403 404 405 406
  }

  private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
      List<TDataNodeLocation> regionReplicaNodes) {
407
    return configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
408 409
        .map(TDataNodeConfiguration::getLocation)
        .filter(e -> !regionReplicaNodes.contains(e))
O
OneSizeFitQuorum 已提交
410
        .max(Comparator.comparingInt(TDataNodeLocation::getDataNodeId));
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
  }

  private boolean isSucceed(TSStatus status) {
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
  }

  private boolean isFailed(TSStatus status) {
    return !isSucceed(status);
  }

  /**
   * Stop old data node
   *
   * @param dataNode old data node
   */
426
  public void stopDataNode(TDataNodeLocation dataNode) {
427 428 429 430
    LOGGER.info(
        "{}, Begin to stop DataNode and kill the DataNode process {}",
        REMOVE_DATANODE_PROCESS,
        dataNode);
431 432
    TSStatus status =
        SyncDataNodeClientPool.getInstance()
433 434
            .sendSyncRequestToDataNodeWithGivenRetry(
                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
435
    configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId());
436
    LOGGER.info(
437
        "{}, Stop Data Node result: {}, stoppedDataNode: {}",
438
        REMOVE_DATANODE_PROCESS,
439 440
        status,
        dataNode);
441 442 443 444 445 446 447 448 449 450 451
  }

  /**
   * check if the remove datanode request illegal
   *
   * @param removeDataNodePlan RemoveDataNodeReq
   * @return SUCCEED_STATUS when request is legal.
   */
  public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
    dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
452 453 454 455 456 457 458 459

    TSStatus status = checkClusterProtocol();
    if (isFailed(status)) {
      dataSet.setStatus(status);
      return dataSet;
    }
    status = checkRegionReplication(removeDataNodePlan);
    if (isFailed(status)) {
460 461 462 463 464
      dataSet.setStatus(status);
      return dataSet;
    }

    status = checkDataNodeExist(removeDataNodePlan);
465
    if (isFailed(status)) {
466 467 468 469 470 471 472 473
      dataSet.setStatus(status);
      return dataSet;
    }

    return dataSet;
  }

  /**
474
   * Check whether all DataNodes to be deleted exist in the cluster
475 476
   *
   * @param removeDataNodePlan RemoveDataNodeReq
477 478
   * @return SUCCEED_STATUS if all DataNodes to be deleted exist in the cluster, DATANODE_NOT_EXIST
   *     otherwise
479 480 481 482 483
   */
  private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());

    List<TDataNodeLocation> allDataNodes =
484
        configManager.getNodeManager().getRegisteredDataNodes().stream()
485 486 487 488 489 490 491 492 493 494 495 496 497
            .map(TDataNodeConfiguration::getLocation)
            .collect(Collectors.toList());
    boolean hasNotExistNode =
        removeDataNodePlan.getDataNodeLocations().stream()
            .anyMatch(loc -> !allDataNodes.contains(loc));
    if (hasNotExistNode) {
      status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
      status.setMessage("there exist Data Node in request but not in cluster");
    }
    return status;
  }

  /**
498
   * Check whether the cluster has enough DataNodes to maintain RegionReplicas
499 500
   *
   * @param removeDataNodePlan RemoveDataNodeReq
501
   * @return SUCCEED_STATUS if the number of DataNodes is enough, LACK_REPLICATION otherwise
502 503 504
   */
  private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
505 506
    List<TDataNodeLocation> removedDataNodes = removeDataNodePlan.getDataNodeLocations();

507 508 509 510 511
    int availableDatanodeSize =
        configManager
            .getNodeManager()
            .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly)
            .size();
512 513 514 515 516
    // when the configuration is one replication, it will be failed if the data node is not in
    // running state.
    if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) {
      for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
        // check whether removed data node is in running state
517 518
        if (!NodeStatus.Running.equals(
            configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) {
519 520 521 522 523
          removedDataNodes.remove(dataNodeLocation);
          LOGGER.error(
              "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication",
              dataNodeLocation);
        }
524
        if (removedDataNodes.isEmpty()) {
525
          status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
526 527 528 529 530 531
          status.setMessage("Failed to remove all requested data nodes");
          return status;
        }
      }
    }

532 533 534 535 536
    int removedDataNodeSize =
        (int)
            removeDataNodePlan.getDataNodeLocations().stream()
                .filter(
                    x ->
537
                        configManager.getLoadManager().getNodeStatus(x.getDataNodeId())
538 539
                            != NodeStatus.Unknown)
                .count();
540
    if (availableDatanodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
541
      status.setCode(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
542
      status.setMessage(
543 544
          String.format(
              "Can't remove datanode due to the limit of replication factor, "
545 546
                  + "availableDataNodeSize: %s, maxReplicaFactor: %s, max allowed removed Data Node size is: %s",
              availableDatanodeSize,
547
              NodeInfo.getMinimumDataNode(),
548
              (availableDatanodeSize - NodeInfo.getMinimumDataNode())));
549 550 551 552 553 554 555 556 557 558 559
    }
    return status;
  }

  public LockQueue getRegionMigrateLock() {
    return regionMigrateLock;
  }

  /**
   * Remove data node in node info
   *
560
   * @param dataNodeLocation data node location
561
   */
562 563 564
  public void removeDataNodePersistence(TDataNodeLocation dataNodeLocation) {
    // Remove consensus record
    List<TDataNodeLocation> removeDataNodes = Collections.singletonList(dataNodeLocation);
565
    configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
566

567 568 569
    // Adjust maxRegionGroupNum
    configManager.getClusterSchemaManager().adjustMaxRegionGroupNum();

570 571
    // Remove metrics
    PartitionMetrics.unbindDataNodePartitionMetrics(
572
        MetricService.getInstance(),
573
        NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint()));
574
  }
575

576 577 578
  /**
   * Change the leader of given Region.
   *
579 580
   * <p>For IOT_CONSENSUS, using `changeLeaderForIoTConsensus` method to change the regionLeaderMap
   * maintained in ConfigNode.
581 582 583 584 585 586 587 588 589 590 591
   *
   * <p>For RATIS_CONSENSUS, invoking `changeRegionLeader` DataNode RPC method to change the leader.
   *
   * @param regionId The region to be migrated
   * @param originalDataNode The DataNode where the region locates
   * @param migrateDestDataNode The DataNode where the region is to be migrated
   */
  public void changeRegionLeader(
      TConsensusGroupId regionId,
      TDataNodeLocation originalDataNode,
      TDataNodeLocation migrateDestDataNode) {
592
    Optional<TDataNodeLocation> newLeaderNode =
593
        filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
594 595

    if (TConsensusGroupType.DataRegion.equals(regionId.getType())
596
        && IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
597 598 599 600 601 602
      if (CONF.getDataReplicationFactor() == 1) {
        newLeaderNode = Optional.of(migrateDestDataNode);
      }
      if (newLeaderNode.isPresent()) {
        configManager
            .getLoadManager()
603
            .forceUpdateRegionLeader(regionId, newLeaderNode.get().getDataNodeId());
604 605

        LOGGER.info(
606
            "{}, Change region leader finished for IOT_CONSENSUS, regionId: {}, newLeaderNode: {}",
607
            REGION_MIGRATE_PROCESS,
608 609 610 611 612 613 614
            regionId,
            newLeaderNode);
      }

      return;
    }

615
    if (newLeaderNode.isPresent()) {
616
      // TODO: Trigger event post after enhance RegionMigrate procedure
617 618
      SyncDataNodeClientPool.getInstance()
          .changeRegionLeader(
619
              regionId, originalDataNode.getInternalEndPoint(), newLeaderNode.get());
620
      LOGGER.info(
621
          "{}, Change region leader finished for RATIS_CONSENSUS, regionId: {}, newLeaderNode: {}",
622
          REGION_MIGRATE_PROCESS,
623 624 625 626 627
          regionId,
          newLeaderNode);
    }
  }

628
  /**
629 630 631 632 633 634
   * Filter a DataNode who contains other RegionReplica excepts the given one.
   *
   * <p>Choose the RUNNING status datanode firstly, if no RUNNING status datanode match the
   * condition, then we choose the REMOVING status datanode.
   *
   * <p>`addRegionPeer`, `removeRegionPeer` and `changeRegionLeader` invoke this method.
635 636 637 638 639 640 641 642
   *
   * @param regionId The specific RegionId
   * @param filterLocation The DataNodeLocation that should be filtered
   * @return A DataNodeLocation that contains other RegionReplica and different from the
   *     filterLocation
   */
  private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
      TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
643 644 645
    List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
    if (regionLocations.isEmpty()) {
      LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
646 647 648
      return Optional.empty();
    }

649 650
    // Choosing the RUNNING DataNodes to execute firstly
    // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
651 652 653 654 655
    List<TDataNodeLocation> aliveDataNodes =
        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
            .map(TDataNodeConfiguration::getLocation)
            .collect(Collectors.toList());

656 657 658 659
    aliveDataNodes.addAll(
        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
            .map(TDataNodeConfiguration::getLocation)
            .collect(Collectors.toList()));
660

661
    // TODO return the node which has lowest load.
662 663 664
    for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
      if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
        return Optional.of(aliveDataNode);
665 666 667 668
      }
    }

    return Optional.empty();
669 670
  }

671 672 673 674 675 676 677 678
  /**
   * Check the protocol of the cluster, standalone is not supported to remove data node currently
   *
   * @return SUCCEED_STATUS if the Cluster is not standalone protocol, REMOVE_DATANODE_FAILED
   *     otherwise
   */
  private TSStatus checkClusterProtocol() {
    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
679 680
    if (CONF.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)
        || CONF.getSchemaRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
681
      status.setCode(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
682
      status.setMessage("SimpleConsensus protocol is not supported to remove data node");
683 684 685
    }
    return status;
  }
686
}