未验证 提交 6357533a 编写于 作者: Y YongzaoDan 提交者: GitHub

[IOTDB-6125] Fix DataPartition allocation bug when insert big batch data (#10924)

上级 1dce787c
...@@ -205,10 +205,19 @@ public class PartitionBalancer { ...@@ -205,10 +205,19 @@ public class PartitionBalancer {
*/ */
public void reBalanceDataPartitionPolicy(String database) { public void reBalanceDataPartitionPolicy(String database) {
try { try {
dataPartitionPolicyTableMap DataPartitionPolicyTable dataPartitionPolicyTable =
.computeIfAbsent(database, empty -> new DataPartitionPolicyTable()) dataPartitionPolicyTableMap.computeIfAbsent(
.reBalanceDataPartitionPolicy( database, empty -> new DataPartitionPolicyTable());
getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
try {
dataPartitionPolicyTable.acquireLock();
dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
dataPartitionPolicyTable.logDataAllotTable(database);
} finally {
dataPartitionPolicyTable.releaseLock();
}
} catch (DatabaseNotExistsException e) { } catch (DatabaseNotExistsException e) {
LOGGER.error("Database {} not exists when updateDataAllotTable", database); LOGGER.error("Database {} not exists when updateDataAllotTable", database);
} }
...@@ -224,6 +233,8 @@ public class PartitionBalancer { ...@@ -224,6 +233,8 @@ public class PartitionBalancer {
DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable(); DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable();
dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable); dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable);
try { try {
dataPartitionPolicyTable.acquireLock();
// Put all DataRegionGroups into the DataPartitionPolicyTable // Put all DataRegionGroups into the DataPartitionPolicyTable
dataPartitionPolicyTable.reBalanceDataPartitionPolicy( dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
getPartitionManager() getPartitionManager()
...@@ -233,6 +244,8 @@ public class PartitionBalancer { ...@@ -233,6 +244,8 @@ public class PartitionBalancer {
getPartitionManager().getLastDataAllotTable(database)); getPartitionManager().getLastDataAllotTable(database));
} catch (DatabaseNotExistsException e) { } catch (DatabaseNotExistsException e) {
LOGGER.error("Database {} not exists when setupPartitionBalancer", database); LOGGER.error("Database {} not exists when setupPartitionBalancer", database);
} finally {
dataPartitionPolicyTable.releaseLock();
} }
}); });
} }
......
...@@ -25,6 +25,9 @@ import org.apache.iotdb.commons.structure.BalanceTreeMap; ...@@ -25,6 +25,9 @@ import org.apache.iotdb.commons.structure.BalanceTreeMap;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
...@@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock;
public class DataPartitionPolicyTable { public class DataPartitionPolicyTable {
private static final Logger LOGGER = LoggerFactory.getLogger(DataPartitionPolicyTable.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
...@@ -69,6 +74,12 @@ public class DataPartitionPolicyTable { ...@@ -69,6 +74,12 @@ public class DataPartitionPolicyTable {
dataAllotMap.put(seriesPartitionSlot, regionGroupId); dataAllotMap.put(seriesPartitionSlot, regionGroupId);
seriesPartitionSlotCounter.put( seriesPartitionSlotCounter.put(
regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1); regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1);
LOGGER.info(
"[ActivateDataAllotTable] Activate SeriesPartitionSlot {} "
+ "to RegionGroup {}, SeriesPartitionSlot Count: {}",
seriesPartitionSlot,
regionGroupId,
seriesPartitionSlotCounter.get(regionGroupId));
return regionGroupId; return regionGroupId;
} }
...@@ -102,6 +113,8 @@ public class DataPartitionPolicyTable { ...@@ -102,6 +113,8 @@ public class DataPartitionPolicyTable {
int mu = SERIES_SLOT_NUM / dataRegionGroups.size(); int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) { for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) {
if (!dataAllotMap.containsKey(seriesPartitionSlot)) { if (!dataAllotMap.containsKey(seriesPartitionSlot)) {
// Skip unallocated SeriesPartitionSlot
// They will be activated when allocating DataPartition
continue; continue;
} }
...@@ -109,6 +122,7 @@ public class DataPartitionPolicyTable { ...@@ -109,6 +122,7 @@ public class DataPartitionPolicyTable {
int seriesPartitionSlotCount = seriesPartitionSlotCounter.get(regionGroupId); int seriesPartitionSlotCount = seriesPartitionSlotCounter.get(regionGroupId);
if (seriesPartitionSlotCount > mu) { if (seriesPartitionSlotCount > mu) {
// Remove from dataAllotMap if the number of SeriesSlots is greater than mu // Remove from dataAllotMap if the number of SeriesSlots is greater than mu
// They will be re-activated when allocating DataPartition
dataAllotMap.remove(seriesPartitionSlot); dataAllotMap.remove(seriesPartitionSlot);
seriesPartitionSlotCounter.put(regionGroupId, seriesPartitionSlotCount - 1); seriesPartitionSlotCounter.put(regionGroupId, seriesPartitionSlotCount - 1);
} }
...@@ -143,6 +157,19 @@ public class DataPartitionPolicyTable { ...@@ -143,6 +157,19 @@ public class DataPartitionPolicyTable {
} }
} }
public void logDataAllotTable(String database) {
seriesPartitionSlotCounter
.keySet()
.forEach(
regionGroupId ->
LOGGER.info(
"[ReBalanceDataAllotTable] Database: {}, "
+ "RegionGroupId: {}, SeriesPartitionSlot Count: {}",
database,
regionGroupId,
seriesPartitionSlotCounter.get(regionGroupId)));
}
public void acquireLock() { public void acquireLock() {
dataAllotTableLock.lock(); dataAllotTableLock.lock();
} }
......
...@@ -508,7 +508,7 @@ public class PartitionManager { ...@@ -508,7 +508,7 @@ public class PartitionManager {
Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType) Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
throws DatabaseNotExistsException, NotEnoughDataNodeException { throws DatabaseNotExistsException, NotEnoughDataNodeException {
// Map<StorageGroup, Region allotment> // Map<Database, Region allotment>
Map<String, Integer> allotmentMap = new ConcurrentHashMap<>(); Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) { for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
...@@ -531,7 +531,7 @@ public class PartitionManager { ...@@ -531,7 +531,7 @@ public class PartitionManager {
Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType) Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, DatabaseNotExistsException { throws NotEnoughDataNodeException, DatabaseNotExistsException {
// Map<StorageGroup, Region allotment> // Map<Database, Region allotment>
Map<String, Integer> allotmentMap = new ConcurrentHashMap<>(); Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) { for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
...@@ -544,15 +544,20 @@ public class PartitionManager { ...@@ -544,15 +544,20 @@ public class PartitionManager {
float slotCount = float slotCount =
(float) partitionInfo.getAssignedSeriesPartitionSlotsCount(database) (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(database)
+ unassignedPartitionSlotsCount; + unassignedPartitionSlotsCount;
float maxRegionGroupCount = float maxRegionGroupNum =
getClusterSchemaManager().getMaxRegionGroupNum(database, consensusGroupType); getClusterSchemaManager().getMaxRegionGroupNum(database, consensusGroupType);
float maxSlotCount = CONF.getSeriesSlotNum(); float maxSlotCount = CONF.getSeriesSlotNum();
/* RegionGroup extension is required in the following cases */ /* RegionGroup extension is required in the following cases */
// 1. The number of current RegionGroup of the StorageGroup is less than the minimum number // 1. The number of current RegionGroup of the Database is less than the minimum number
int minRegionGroupNum = int minRegionGroupNum =
getClusterSchemaManager().getMinRegionGroupNum(database, consensusGroupType); getClusterSchemaManager().getMinRegionGroupNum(database, consensusGroupType);
if (allocatedRegionGroupCount < minRegionGroupNum) { if (allocatedRegionGroupCount < minRegionGroupNum
// Ensure the number of RegionGroups is enough
// for current SeriesPartitionSlots after extension
// Otherwise, more RegionGroups should be extended through case 2.
&& slotCount <= (maxSlotCount / maxRegionGroupNum) * minRegionGroupNum) {
// Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
// no less than the minRegionGroupNum // no less than the minRegionGroupNum
int delta = int delta =
...@@ -560,32 +565,31 @@ public class PartitionManager { ...@@ -560,32 +565,31 @@ public class PartitionManager {
Math.min( Math.min(
unassignedPartitionSlotsCount, minRegionGroupNum - allocatedRegionGroupCount); unassignedPartitionSlotsCount, minRegionGroupNum - allocatedRegionGroupCount);
allotmentMap.put(database, delta); allotmentMap.put(database, delta);
continue;
} }
// 2. The average number of partitions held by each Region will be greater than the // 2. The average number of partitions held by each Region will be greater than the
// expected average number after the partition allocation is completed // expected average number after the partition allocation is completed
if (allocatedRegionGroupCount < maxRegionGroupCount if (allocatedRegionGroupCount < maxRegionGroupNum
&& slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) { && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupNum) {
// The delta is equal to the smallest integer solution that satisfies the inequality: // The delta is equal to the smallest integer solution that satisfies the inequality:
// slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupNum
int delta = int delta =
Math.min( Math.min(
(int) (maxRegionGroupCount - allocatedRegionGroupCount), (int) (maxRegionGroupNum - allocatedRegionGroupCount),
Math.max( Math.max(
1, 1,
(int) (int)
Math.ceil( Math.ceil(
slotCount * maxRegionGroupCount / maxSlotCount slotCount * maxRegionGroupNum / maxSlotCount
- allocatedRegionGroupCount))); - allocatedRegionGroupCount)));
allotmentMap.put(database, delta); allotmentMap.put(database, delta);
continue; continue;
} }
// 3. All RegionGroups in the specified StorageGroup are disabled currently // 3. All RegionGroups in the specified Database are disabled currently
if (allocatedRegionGroupCount if (allocatedRegionGroupCount
== filterRegionGroupThroughStatus(database, RegionGroupStatus.Disabled).size() == filterRegionGroupThroughStatus(database, RegionGroupStatus.Disabled).size()
&& allocatedRegionGroupCount < maxRegionGroupCount) { && allocatedRegionGroupCount < maxRegionGroupNum) {
allotmentMap.put(database, 1); allotmentMap.put(database, 1);
} }
} }
......
...@@ -77,6 +77,10 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { ...@@ -77,6 +77,10 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
return keyValueMap.getOrDefault(key, null); return keyValueMap.getOrDefault(key, null);
} }
public Set<K> keySet() {
return keyValueMap.keySet();
}
public boolean containsKey(K key) { public boolean containsKey(K key) {
return keyValueMap.containsKey(key); return keyValueMap.containsKey(key);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册