未验证 提交 cc2cadb7 编写于 作者: A Alexander Kazakov 提交者: GitHub

Merge pull request #11893 from excitoon-favorites/autostartmoves

In *MergeTree: Parts moving task shall be started if new storage policy needs them
......@@ -1495,6 +1495,8 @@ void MergeTreeData::changeSettings(
{
if (new_settings)
{
bool has_storage_policy_changed = false;
const auto & new_changes = new_settings->as<const ASTSetQuery &>().changes;
for (const auto & change : new_changes)
......@@ -1503,28 +1505,34 @@ void MergeTreeData::changeSettings(
StoragePolicyPtr new_storage_policy = global_context.getStoragePolicy(change.value.safeGet<String>());
StoragePolicyPtr old_storage_policy = getStoragePolicy();
checkStoragePolicy(new_storage_policy);
/// StoragePolicy of different version or name is guaranteed to have different pointer
if (new_storage_policy != old_storage_policy)
{
checkStoragePolicy(new_storage_policy);
std::unordered_set<String> all_diff_disk_names;
for (const auto & disk : new_storage_policy->getDisks())
all_diff_disk_names.insert(disk->getName());
for (const auto & disk : old_storage_policy->getDisks())
all_diff_disk_names.erase(disk->getName());
std::unordered_set<String> all_diff_disk_names;
for (const auto & disk : new_storage_policy->getDisks())
all_diff_disk_names.insert(disk->getName());
for (const auto & disk : old_storage_policy->getDisks())
all_diff_disk_names.erase(disk->getName());
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
if (disk->exists(relative_data_path))
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
}
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
if (disk->exists(relative_data_path))
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
}
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
disk->createDirectories(relative_data_path);
disk->createDirectories(relative_data_path + "detached");
for (const String & disk_name : all_diff_disk_names)
{
auto disk = new_storage_policy->getDiskByName(disk_name);
disk->createDirectories(relative_data_path);
disk->createDirectories(relative_data_path + "detached");
}
/// FIXME how would that be done while reloading configuration???
has_storage_policy_changed = true;
}
/// FIXME how would that be done while reloading configuration???
}
MergeTreeSettings copy = *getSettings();
......@@ -1533,6 +1541,9 @@ void MergeTreeData::changeSettings(
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
new_metadata.setSettingsChanges(new_settings);
setInMemoryMetadata(new_metadata);
if (has_storage_policy_changed)
startBackgroundMovesIfNeeded();
}
}
......@@ -3291,12 +3302,11 @@ bool MergeTreeData::selectPartsAndMove()
bool MergeTreeData::areBackgroundMovesNeeded() const
{
auto policy = getStoragePolicy();
auto metadata_snapshot = getInMemoryMetadataPtr();
if (policy->getVolumes().size() > 1)
return true;
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && metadata_snapshot->hasAnyMoveTTL();
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
......
......@@ -794,8 +794,6 @@ protected:
void checkStoragePolicy(const StoragePolicyPtr & new_storage_policy) const;
void setStoragePolicy(const String & new_storage_policy_name, bool only_check = false);
/// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
void calculateColumnSizesImpl();
/// Adds or subtracts the contribution of the part to compressed column sizes.
......@@ -873,6 +871,8 @@ private:
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason) const;
virtual void startBackgroundMovesIfNeeded() = 0;
};
}
......@@ -102,12 +102,7 @@ void StorageMergeTree::startup()
/// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done.
merge_pool.startTask(merging_mutating_task_handle);
if (areBackgroundMovesNeeded())
{
auto & move_pool = global_context.getBackgroundMovePool();
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
move_pool.startTask(moving_task_handle);
}
startBackgroundMovesIfNeeded();
}
catch (...)
{
......@@ -464,6 +459,18 @@ bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
return true;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded() && !moving_task_handle)
{
auto & move_pool = global_context.getBackgroundMovePool();
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
move_pool.startTask(moving_task_handle);
}
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_processing_in_background_mutex);
......
......@@ -159,6 +159,8 @@ private:
/// Just checks versions of each active data part
bool isMutationDone(Int64 mutation_version) const;
void startBackgroundMovesIfNeeded() override;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;
friend struct CurrentlyMergingPartsTagger;
......
......@@ -3263,12 +3263,7 @@ void StorageReplicatedMergeTree::startup()
pool.startTask(queue_task_handle);
}
if (areBackgroundMovesNeeded())
{
auto & pool = global_context.getBackgroundMovePool();
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle);
}
startBackgroundMovesIfNeeded();
}
catch (...)
{
......@@ -5702,4 +5697,16 @@ MutationCommands StorageReplicatedMergeTree::getFirtsAlterMutationCommandsForPar
{
return queue.getFirstAlterMutationCommandsForPart(part);
}
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded() && !move_parts_task_handle)
{
auto & pool = global_context.getBackgroundMovePool();
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle);
}
}
}
......@@ -551,6 +551,8 @@ private:
MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
......
......@@ -40,6 +40,20 @@
</volumes>
</jbods_with_external>
<default_with_small_jbod_with_external>
<volumes>
<default>
<disk>default</disk>
</default>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</default_with_small_jbod_with_external>
<small_jbod_with_external>
<volumes>
<main>
......
......@@ -160,6 +160,53 @@ def test_inserts_to_disk_work(started_cluster, name, engine, positive):
pass
@pytest.mark.parametrize("name,engine", [
("mt_test_moves_work_after_storage_policy_change","MergeTree()"),
("replicated_mt_test_moves_work_after_storage_policy_change","ReplicatedMergeTree('/clickhouse/test_moves_work_after_storage_policy_change', '1')"),
])
def test_moves_work_after_storage_policy_change(started_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
""".format(name=name, engine=engine))
node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='default_with_small_jbod_with_external'""".format(name=name))
# Second expression is preferred because d1 > now()-3600.
node1.query("""ALTER TABLE {name} MODIFY TTL now()-3600 TO DISK 'jbod1', d1 TO DISK 'external'""".format(name=name))
wait_expire_1 = 12
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_moves_to_disk_do_not_work","MergeTree()",0),
("replicated_mt_test_moves_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_do_not_work', '1')",0),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册