diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index 6bfa9ae0671687db61a8d901bd406c57c0ce40b2..86bb000ca05b64ec9fd616571a3ea7830de107b2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -205,10 +205,19 @@ public class PartitionBalancer { */ public void reBalanceDataPartitionPolicy(String database) { try { - dataPartitionPolicyTableMap - .computeIfAbsent(database, empty -> new DataPartitionPolicyTable()) - .reBalanceDataPartitionPolicy( - getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion)); + DataPartitionPolicyTable dataPartitionPolicyTable = + dataPartitionPolicyTableMap.computeIfAbsent( + database, empty -> new DataPartitionPolicyTable()); + + try { + dataPartitionPolicyTable.acquireLock(); + dataPartitionPolicyTable.reBalanceDataPartitionPolicy( + getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion)); + dataPartitionPolicyTable.logDataAllotTable(database); + } finally { + dataPartitionPolicyTable.releaseLock(); + } + } catch (DatabaseNotExistsException e) { LOGGER.error("Database {} not exists when updateDataAllotTable", database); } @@ -224,6 +233,8 @@ public class PartitionBalancer { DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable(); dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable); try { + dataPartitionPolicyTable.acquireLock(); + // Put all DataRegionGroups into the DataPartitionPolicyTable dataPartitionPolicyTable.reBalanceDataPartitionPolicy( getPartitionManager() @@ -233,6 +244,8 @@ public class PartitionBalancer { getPartitionManager().getLastDataAllotTable(database)); } catch (DatabaseNotExistsException e) { LOGGER.error("Database {} not exists when setupPartitionBalancer", database); + } finally { + dataPartitionPolicyTable.releaseLock(); } }); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java index ed297abf4d7a4a85daae0972de6b33f25ee1f3f7..4f1727d88b937df84baad628db76d94c8ed93860 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java @@ -25,6 +25,9 @@ import org.apache.iotdb.commons.structure.BalanceTreeMap; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock; public class DataPartitionPolicyTable { + private static final Logger LOGGER = LoggerFactory.getLogger(DataPartitionPolicyTable.class); + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); @@ -69,6 +74,12 @@ public class DataPartitionPolicyTable { dataAllotMap.put(seriesPartitionSlot, regionGroupId); seriesPartitionSlotCounter.put( regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1); + LOGGER.info( + "[ActivateDataAllotTable] Activate SeriesPartitionSlot {} " + + "to RegionGroup {}, SeriesPartitionSlot Count: {}", + seriesPartitionSlot, + regionGroupId, + seriesPartitionSlotCounter.get(regionGroupId)); return regionGroupId; } @@ -102,6 +113,8 @@ public class DataPartitionPolicyTable { int mu = SERIES_SLOT_NUM / dataRegionGroups.size(); for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) { if (!dataAllotMap.containsKey(seriesPartitionSlot)) { + // Skip unallocated SeriesPartitionSlot + // They will be activated when allocating DataPartition continue; } @@ -109,6 +122,7 @@ public class DataPartitionPolicyTable { int seriesPartitionSlotCount = seriesPartitionSlotCounter.get(regionGroupId); if (seriesPartitionSlotCount > mu) { // Remove from dataAllotMap if the number of SeriesSlots is greater than mu + // They will be re-activated when allocating DataPartition dataAllotMap.remove(seriesPartitionSlot); seriesPartitionSlotCounter.put(regionGroupId, seriesPartitionSlotCount - 1); } @@ -143,6 +157,19 @@ public class DataPartitionPolicyTable { } } + public void logDataAllotTable(String database) { + seriesPartitionSlotCounter + .keySet() + .forEach( + regionGroupId -> + LOGGER.info( + "[ReBalanceDataAllotTable] Database: {}, " + + "RegionGroupId: {}, SeriesPartitionSlot Count: {}", + database, + regionGroupId, + seriesPartitionSlotCounter.get(regionGroupId))); + } + public void acquireLock() { dataAllotTableLock.lock(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index ac0df2916a916a31a48ce3ca50ba7a0cd031d3fd..7797d20a106d0f29913fd10327388ceee2460875 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -508,7 +508,7 @@ public class PartitionManager { Map unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType) throws DatabaseNotExistsException, NotEnoughDataNodeException { - // Map + // Map Map allotmentMap = new ConcurrentHashMap<>(); for (Map.Entry entry : unassignedPartitionSlotsCountMap.entrySet()) { @@ -531,7 +531,7 @@ public class PartitionManager { Map unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType) throws NotEnoughDataNodeException, DatabaseNotExistsException { - // Map + // Map Map allotmentMap = new ConcurrentHashMap<>(); for (Map.Entry entry : unassignedPartitionSlotsCountMap.entrySet()) { @@ -544,15 +544,20 @@ public class PartitionManager { float slotCount = (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(database) + unassignedPartitionSlotsCount; - float maxRegionGroupCount = + float maxRegionGroupNum = getClusterSchemaManager().getMaxRegionGroupNum(database, consensusGroupType); float maxSlotCount = CONF.getSeriesSlotNum(); /* RegionGroup extension is required in the following cases */ - // 1. The number of current RegionGroup of the StorageGroup is less than the minimum number + // 1. The number of current RegionGroup of the Database is less than the minimum number int minRegionGroupNum = getClusterSchemaManager().getMinRegionGroupNum(database, consensusGroupType); - if (allocatedRegionGroupCount < minRegionGroupNum) { + if (allocatedRegionGroupCount < minRegionGroupNum + // Ensure the number of RegionGroups is enough + // for current SeriesPartitionSlots after extension + // Otherwise, more RegionGroups should be extended through case 2. + && slotCount <= (maxSlotCount / maxRegionGroupNum) * minRegionGroupNum) { + // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount // no less than the minRegionGroupNum int delta = @@ -560,32 +565,31 @@ public class PartitionManager { Math.min( unassignedPartitionSlotsCount, minRegionGroupNum - allocatedRegionGroupCount); allotmentMap.put(database, delta); - continue; } // 2. The average number of partitions held by each Region will be greater than the // expected average number after the partition allocation is completed - if (allocatedRegionGroupCount < maxRegionGroupCount - && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) { + if (allocatedRegionGroupCount < maxRegionGroupNum + && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupNum) { // The delta is equal to the smallest integer solution that satisfies the inequality: - // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount + // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupNum int delta = Math.min( - (int) (maxRegionGroupCount - allocatedRegionGroupCount), + (int) (maxRegionGroupNum - allocatedRegionGroupCount), Math.max( 1, (int) Math.ceil( - slotCount * maxRegionGroupCount / maxSlotCount + slotCount * maxRegionGroupNum / maxSlotCount - allocatedRegionGroupCount))); allotmentMap.put(database, delta); continue; } - // 3. All RegionGroups in the specified StorageGroup are disabled currently + // 3. All RegionGroups in the specified Database are disabled currently if (allocatedRegionGroupCount == filterRegionGroupThroughStatus(database, RegionGroupStatus.Disabled).size() - && allocatedRegionGroupCount < maxRegionGroupCount) { + && allocatedRegionGroupCount < maxRegionGroupNum) { allotmentMap.put(database, 1); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java index 1fe5ce1bcdf3195de9d224817ec9c0910b085bbd..20df26ffcb4c72f24bb42a9d374aa89f4864a1c9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java @@ -77,6 +77,10 @@ public class BalanceTreeMap> { return keyValueMap.getOrDefault(key, null); } + public Set keySet() { + return keyValueMap.keySet(); + } + public boolean containsKey(K key) { return keyValueMap.containsKey(key); }