提交 2f80c150 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

save MergeTree data format version and parse part names depending on it [#CLICKHOUSE-3000]

上级 937d3f38
......@@ -4,7 +4,8 @@
namespace DB
{
ActiveDataPartSet::ActiveDataPartSet(const Strings & names)
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names)
: format_version(format_version_)
{
for (const auto & name : names)
addImpl(name);
......@@ -20,7 +21,7 @@ void ActiveDataPartSet::add(const String & name)
void ActiveDataPartSet::addImpl(const String & name)
{
auto part_info = MergeTreePartInfo::fromPartName(name);
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
if (!getContainingPartImpl(part_info).empty())
return;
......@@ -53,7 +54,7 @@ void ActiveDataPartSet::addImpl(const String & name)
String ActiveDataPartSet::getContainingPart(const String & part_name) const
{
std::lock_guard<std::mutex> lock(mutex);
return getContainingPartImpl(MergeTreePartInfo::fromPartName(part_name));
return getContainingPartImpl(MergeTreePartInfo::fromPartName(part_name, format_version));
}
......
......@@ -18,8 +18,8 @@ namespace DB
class ActiveDataPartSet
{
public:
ActiveDataPartSet() {}
ActiveDataPartSet(const Strings & names);
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) {}
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names);
void add(const String & name);
......@@ -31,6 +31,8 @@ public:
size_t size() const;
private:
MergeTreeDataFormatVersion format_version;
mutable std::mutex mutex;
std::map<MergeTreePartInfo, String> part_info_to_name;
......
......@@ -61,6 +61,7 @@ namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int SYNTAX_ERROR;
extern const int CORRUPTED_DATA;
}
......@@ -130,6 +131,23 @@ MergeTreeData::MergeTreeData(
/// Creating directories, if not exist.
Poco::File(full_path).createDirectories();
Poco::File(full_path + "detached").createDirectory();
String version_file_path = full_path + "format_version.txt";
if (!attach)
{
format_version = 0;
WriteBufferFromFile buf(version_file_path);
writeIntText(format_version.toUnderType(), buf);
}
else if (Poco::File(version_file_path).exists())
{
ReadBufferFromFile buf(version_file_path);
readIntText(format_version, buf);
if (!buf.eof())
throw Exception("Bad version file: " + version_file_path, ErrorCodes::CORRUPTED_DATA);
}
else
format_version = 0;
}
......@@ -362,7 +380,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const String & file_name : part_file_names)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(file_name, &part_info))
if (!MergeTreePartInfo::tryParsePartName(file_name, &part_info, format_version))
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this, file_name, part_info);
......@@ -414,7 +432,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
continue;
MergeTreePartInfo contained_part_info;
if (!MergeTreePartInfo::tryParsePartName(contained_name, &contained_part_info))
if (!MergeTreePartInfo::tryParsePartName(contained_name, &contained_part_info, format_version))
continue;
if (part->info.contains(contained_part_info))
......@@ -1249,8 +1267,11 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
if (increment)
part->info.min_block = part->info.max_block = increment->get();
String new_name = MergeTreePartInfo::getPartName(
part->getMinDate(), part->getMaxDate(), part->info.min_block, part->info.max_block, part->info.level);
String new_name;
if (format_version == 0)
new_name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate());
else
new_name = part->info.getPartName();
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << new_name << ".");
......@@ -1526,7 +1547,7 @@ void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
std::lock_guard<std::mutex> lock(data_parts_mutex);
......@@ -1553,7 +1574,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
auto it = all_data_parts.lower_bound(part_info);
......@@ -1721,7 +1742,7 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_info))
if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_info, format_version))
continue;
if (part_info.partition_id != partition_id)
continue;
......
......@@ -466,6 +466,8 @@ public:
/// For determining the partition id of inserted blocks.
String getPartitionIDFromData(const Row & partition);
MergeTreeDataFormatVersion format_version;
Context & context;
const String date_column_name;
const ASTPtr sampling_expression;
......
#pragma once
#include <common/strong_typedef.h>
namespace DB
{
STRONG_TYPEDEF(UInt32, MergeTreeDataFormatVersion);
}
......@@ -84,6 +84,8 @@ void MergeTreeDataMerger::FuturePart::assign(MergeTreeData::DataPartsVector part
part_info.max_block = parts.back()->info.max_block;
part_info.level = max_level + 1;
if (parts.front()->storage.format_version == 0)
{
DayNum_t min_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t max_date = DayNum_t(std::numeric_limits<UInt16>::min());
for (const auto & part : parts)
......@@ -92,8 +94,10 @@ void MergeTreeDataMerger::FuturePart::assign(MergeTreeData::DataPartsVector part
max_date = std::max(max_date, part->getMaxDate());
}
name = MergeTreePartInfo::getPartName(
min_date, max_date, part_info.min_block, part_info.max_block, part_info.level);
name = part_info.getPartNameV0(min_date, max_date);
}
else
name = part_info.getPartName();
}
MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_)
......@@ -1069,9 +1073,11 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
size_t shard_no = entry.first;
MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
std::string new_name = MergeTreePartInfo::getPartName(
part_from_shard->getMinDate(), part_from_shard->getMaxDate(),
part_from_shard->info.min_block, part_from_shard->info.max_block, part_from_shard->info.level);
std::string new_name;
if (data.format_version == 0)
new_name = part_from_shard->info.getPartNameV0(part_from_shard->getMinDate(), part_from_shard->getMaxDate());
else
new_name = part_from_shard->info.getPartName();
std::string new_relative_path = "reshard/" + toString(shard_no) + "/" + new_name;
part_from_shard->renameTo(new_relative_path);
......
......@@ -337,6 +337,11 @@ void MinMaxIndex::merge(const MinMaxIndex & other)
}
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_)
: storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
{
}
/// Returns the size of .bin file for column `name` if found, zero otherwise.
size_t MergeTreeDataPart::getColumnCompressedSize(const String & name) const
{
......@@ -657,6 +662,8 @@ void MergeTreeDataPart::loadIndex()
void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
{
if (storage.format_version == 0)
{
DayNum_t min_date;
DayNum_t max_date;
MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
......@@ -667,6 +674,9 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
minmax_idx.min_column_values = Row(1, static_cast<UInt64>(min_date));
minmax_idx.max_column_values = Row(1, static_cast<UInt64>(max_date));
minmax_idx.initialized = true;
}
else
throw Exception("TODO", ErrorCodes::LOGICAL_ERROR);
}
void MergeTreeDataPart::loadChecksums(bool require)
......
......@@ -104,16 +104,13 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(MergeTreeData & storage_, const String & name_)
: storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_))
{
}
MergeTreeDataPart(MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), name(name_), info(info_)
{
}
MergeTreeDataPart(MergeTreeData & storage_, const String & name_);
/// Returns checksum of column's binary file.
const Checksum * tryGetBinChecksum(const String & name) const;
......
......@@ -134,6 +134,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
MinMaxIndex minmax_idx;
minmax_idx.update(block, data.minmax_idx_columns);
String new_partition_id = data.getPartitionIDFromData(block_with_partition.partition);
MergeTreePartInfo new_part_info(new_partition_id, temp_index, temp_index, 0);
String part_name;
if (data.format_version == 0)
{
DayNum_t min_date(minmax_idx.min_column_values[data.minmax_idx_date_column_pos].get<UInt64>());
DayNum_t max_date(minmax_idx.max_column_values[data.minmax_idx_date_column_pos].get<UInt64>());
......@@ -145,11 +151,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
if (min_month != max_month)
throw Exception("Logical error: part spans more than one month.");
String part_name = MergeTreePartInfo::getPartName(min_date, max_date, temp_index, temp_index, 0);
part_name = new_part_info.getPartNameV0(min_date, max_date);
}
else
part_name = new_part_info.getPartName();
String new_partition_id = data.getPartitionIDFromData(block_with_partition.partition);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, part_name, MergeTreePartInfo(new_partition_id, temp_index, temp_index, 0));
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_name, new_part_info);
new_data_part->partition = std::move(block_with_partition.partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->relative_path = TMP_PREFIX + part_name;
......
......@@ -12,29 +12,51 @@ namespace ErrorCodes
}
MergeTreePartInfo MergeTreePartInfo::fromPartName(const String & dir_name)
MergeTreePartInfo MergeTreePartInfo::fromPartName(const String & dir_name, MergeTreeDataFormatVersion format_version)
{
MergeTreePartInfo part_info;
if (!tryParsePartName(dir_name, &part_info))
if (!tryParsePartName(dir_name, &part_info, format_version))
throw Exception("Unexpected part name: " + dir_name, ErrorCodes::BAD_DATA_PART_NAME);
return part_info;
}
bool MergeTreePartInfo::tryParsePartName(const String & dir_name, MergeTreePartInfo * part_info)
bool MergeTreePartInfo::tryParsePartName(const String & dir_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version)
{
ReadBufferFromString in(dir_name);
String partition_id;
if (format_version == 0)
{
UInt32 min_yyyymmdd = 0;
UInt32 max_yyyymmdd = 0;
if (!tryReadIntText(min_yyyymmdd, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_yyyymmdd, in)
|| !checkChar('_', in))
{
return false;
}
partition_id = toString(min_yyyymmdd / 100);
}
else
{
while (!in.eof())
{
char c;
readChar(c, in);
if (c == '_')
break;
partition_id.push_back(c);
}
}
Int64 min_block_num = 0;
Int64 max_block_num = 0;
UInt32 level = 0;
ReadBufferFromString in(dir_name);
if (!tryReadIntText(min_yyyymmdd, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_yyyymmdd, in)
|| !checkChar('_', in)
|| !tryReadIntText(min_block_num, in)
if (!tryReadIntText(min_block_num, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_block_num, in)
|| !checkChar('_', in)
......@@ -46,7 +68,7 @@ bool MergeTreePartInfo::tryParsePartName(const String & dir_name, MergeTreePartI
if (part_info)
{
part_info->partition_id = dir_name.substr(0, strlen("YYYYMM"));
part_info->partition_id = std::move(partition_id);
part_info->min_block = min_block_num;
part_info->max_block = max_block_num;
part_info->level = level;
......@@ -83,15 +105,31 @@ void MergeTreePartInfo::parseMinMaxDatesFromPartName(const String & dir_name, Da
}
bool MergeTreePartInfo::contains(const String & outer_part_name, const String & inner_part_name)
bool MergeTreePartInfo::contains(const String & outer_part_name, const String & inner_part_name, MergeTreeDataFormatVersion format_version)
{
MergeTreePartInfo outer = fromPartName(outer_part_name);
MergeTreePartInfo inner = fromPartName(inner_part_name);
MergeTreePartInfo outer = fromPartName(outer_part_name, format_version);
MergeTreePartInfo inner = fromPartName(inner_part_name, format_version);
return outer.contains(inner);
}
String MergeTreePartInfo::getPartName(DayNum_t left_date, DayNum_t right_date, Int64 left_id, Int64 right_id, UInt64 level)
String MergeTreePartInfo::getPartName() const
{
WriteBufferFromOwnString wb;
writeString(partition_id, wb);
writeChar('_', wb);
writeIntText(min_block, wb);
writeChar('_', wb);
writeIntText(max_block, wb);
writeChar('_', wb);
writeIntText(level, wb);
return wb.str();
}
String MergeTreePartInfo::getPartNameV0(DayNum_t left_date, DayNum_t right_date) const
{
const auto & date_lut = DateLUT::instance();
......@@ -106,9 +144,9 @@ String MergeTreePartInfo::getPartName(DayNum_t left_date, DayNum_t right_date, I
writeChar('_', wb);
writeIntText(right_date_id, wb);
writeChar('_', wb);
writeIntText(left_id, wb);
writeIntText(min_block, wb);
writeChar('_', wb);
writeIntText(right_id, wb);
writeIntText(max_block, wb);
writeChar('_', wb);
writeIntText(level, wb);
......
#pragma once
#include <Core/Types.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <common/DateLUT.h>
......@@ -37,15 +38,16 @@ struct MergeTreePartInfo
&& level >= rhs.level;
}
static MergeTreePartInfo fromPartName(const String & part_name);
String getPartName() const;
String getPartNameV0(DayNum_t left_date, DayNum_t right_date) const;
static bool tryParsePartName(const String & dir_name, MergeTreePartInfo * part_info);
static MergeTreePartInfo fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version);
static void parseMinMaxDatesFromPartName(const String & dir_name, DayNum_t & min_date, DayNum_t & max_date);
static bool tryParsePartName(const String & dir_name, MergeTreePartInfo * part_info, MergeTreeDataFormatVersion format_version);
static bool contains(const String & outer_part_name, const String & inner_part_name);
static void parseMinMaxDatesFromPartName(const String & part_name, DayNum_t & min_date, DayNum_t & max_date);
static String getPartName(DayNum_t left_date, DayNum_t right_date, Int64 left_id, Int64 right_id, UInt64 level);
static bool contains(const String & outer_part_name, const String & inner_part_name, MergeTreeDataFormatVersion format_version);
};
}
......@@ -175,8 +175,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
part->info.max_block = block_number;
part->info.level = 0;
String part_name = MergeTreePartInfo::getPartName(
part->getMinDate(), part->getMaxDate(), block_number, block_number, 0);
String part_name;
if (storage.data.format_version == 0)
part_name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate());
else
part_name = part->info.getPartName();
part->name = part_name;
......
......@@ -86,7 +86,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
}
/// If the part is not in ZooKeeper, we'll check if it's at least somewhere.
auto part_info = MergeTreePartInfo::fromPartName(part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.data.format_version);
/** The logic is as follows:
* - if some live or inactive replica has such a part, or a part covering it
......@@ -122,7 +122,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
Strings parts = zookeeper->getChildren(storage.zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica);
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.data.format_version);
if (part_on_replica_info.contains(part_info))
{
......
......@@ -421,7 +421,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
MergeTreePartInfo::contains(part_name, (*it)->new_part_name))
MergeTreePartInfo::contains(part_name, (*it)->new_part_name, format_version))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);
......@@ -460,14 +460,14 @@ ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearCo
{
if (elem->type == LogEntry::MERGE_PARTS || elem->type == LogEntry::GET_PART || elem->type == LogEntry::ATTACH_PART)
{
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name))
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, format_version))
conflicts.emplace_back(elem);
}
if (elem->type == LogEntry::CLEAR_COLUMN)
{
auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name);
auto part = MergeTreePartInfo::fromPartName(entry.new_part_name);
auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, format_version);
auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
if (part.partition_id == cur_part.partition_id)
conflicts.emplace_back(elem);
......@@ -523,12 +523,12 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_
/// A more complex check is whether another part is currently created by other action that will cover this part.
/// NOTE The above is redundant, but left for a more convenient message in the log.
auto result_part = MergeTreePartInfo::fromPartName(new_part_name);
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited.
for (const auto & future_part_name : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_name);
auto future_part = MergeTreePartInfo::fromPartName(future_part_name, format_version);
if (future_part.contains(result_part))
{
......
......@@ -37,6 +37,8 @@ private:
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
MergeTreeDataFormatVersion format_version;
String zookeeper_path;
String replica_path;
String logger_name;
......@@ -121,7 +123,11 @@ private:
};
public:
ReplicatedMergeTreeQueue() {}
ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
, virtual_parts(format_version)
{
}
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper);
......
......@@ -497,12 +497,12 @@ void StorageMergeTree::attachPartition(const ASTPtr & query, const Field & field
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts;
ActiveDataPartSet active_parts(data.format_version);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
const String & name = it.name();
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(name, &part_info)
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version)
|| part_info.partition_id != partition_id)
{
continue;
......
......@@ -201,7 +201,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }),
reader(data), writer(data), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this),
reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(data.format_version),
fetcher(data), sharded_partition_uploader_client(*this),
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
......@@ -700,7 +701,7 @@ void StorageReplicatedMergeTree::createReplica()
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
ActiveDataPartSet active_parts_set(data.format_version, parts);
Strings active_parts = active_parts_set.getParts();
for (const String & name : active_parts)
......@@ -1260,7 +1261,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(quorum_path, quorum_stat.version));
auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name);
auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, data.format_version);
if (part_info.min_block != part_info.max_block)
throw Exception("Logical error: log entry with quorum for part covering more than one block number",
......@@ -1379,7 +1380,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name);
auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, data.format_version);
/// Delete the parts contained in the range to be deleted.
/// It's important that no old parts remain (after the merge), because otherwise,
......@@ -1425,7 +1426,7 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
/// So, if conflicts are found, throw an exception and will retry execution later
queue.disableMergesAndFetchesInRange(entry);
auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name);
auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, data.format_version);
/// We don't change table structure, only data in some parts
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
......@@ -1599,7 +1600,7 @@ namespace
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_node_value);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, data.format_version);
if (part_info.min_block != part_info.max_block)
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
......@@ -1613,7 +1614,7 @@ namespace
String quorum_last_part;
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", quorum_last_part) && quorum_last_part.empty() == false)
{
auto part_info = MergeTreePartInfo::fromPartName(quorum_last_part);
auto part_info = MergeTreePartInfo::fromPartName(quorum_last_part, data.format_version);
if (part_info.min_block != part_info.max_block)
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
......@@ -1970,9 +1971,11 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(const LogEntry
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
if (part_on_replica == entry.new_part_name || MergeTreePartInfo::contains(part_on_replica, entry.new_part_name))
if (part_on_replica == entry.new_part_name
|| MergeTreePartInfo::contains(part_on_replica, entry.new_part_name, data.format_version))
{
if (largest_part_found.empty() || MergeTreePartInfo::contains(part_on_replica, largest_part_found))
if (largest_part_found.empty()
|| MergeTreePartInfo::contains(part_on_replica, largest_part_found, data.format_version))
{
largest_part_found = part_on_replica;
}
......@@ -2116,7 +2119,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
part_name, replica_path, address.host, address.replication_port, to_detached);
if (!to_detached)
{
zkutil::Ops ops;
......@@ -2314,13 +2316,13 @@ BlockInputStreams StorageReplicatedMergeTree::read(
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_str);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, data.format_version);
max_block_number_to_read = part_info.min_block - 1;
}
}
else
{
auto part_info = MergeTreePartInfo::fromPartName(last_part);
auto part_info = MergeTreePartInfo::fromPartName(last_part, data.format_version);
max_block_number_to_read = part_info.max_block;
}
}
......@@ -2567,16 +2569,22 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// The name of an imaginary part covering all possible parts in the specified partition with numbers in the range from zero to specified right bound.
static String getFakePartNameCoveringPartRange(const String & partition_id, UInt64 left, UInt64 right)
static String getFakePartNameCoveringPartRange(
MergeTreeDataFormatVersion format_version, const String & partition_id, UInt64 left, UInt64 right)
{
/// Artificial high level is choosen, to make this part "covering" all parts inside.
MergeTreePartInfo part_info(partition_id, left, right, 999999999);
if (format_version == 0)
{
/// The date range is all month long.
const auto & lut = DateLUT::instance();
time_t start_time = lut.YYYYMMDDToDate(parse<UInt32>(partition_id + "01"));
DayNum_t left_date = lut.toDayNum(start_time);
DayNum_t right_date = DayNum_t(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);
/// Artificial high level is choosen, to make this part "covering" all parts inside.
return MergeTreePartInfo::getPartName(left_date, right_date, left, right, 999999999);
return part_info.getPartNameV0(left_date, right_date);
}
else
return part_info.getPartName();
}
......@@ -2606,7 +2614,7 @@ String StorageReplicatedMergeTree::getFakePartNameCoveringAllPartsInPartition(co
return {};
--right;
return getFakePartNameCoveringPartRange(partition_id, left, right);
return getFakePartNameCoveringPartRange(data.format_version, partition_id, left, right);
}
......@@ -2720,14 +2728,14 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & query, const Fie
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts;
ActiveDataPartSet active_parts(data.format_version);
std::set<String> part_names;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(name, &part_info))
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version))
continue;
if (part_info.partition_id != partition_id)
continue;
......@@ -3228,7 +3236,8 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
for (Poco::DirectoryIterator dir_it{data.getFullPath() + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info) && part_info.partition_id == partition_id)
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, data.format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
}
......@@ -3308,7 +3317,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS);
Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
ActiveDataPartSet active_parts_set(parts);
ActiveDataPartSet active_parts_set(data.format_version, parts);
Strings parts_to_fetch;
if (missing_parts.empty())
......@@ -3319,7 +3328,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
Strings parts_to_fetch_partition;
for (const String & part : parts_to_fetch)
{
if (MergeTreePartInfo::fromPartName(part).partition_id == partition_id)
if (MergeTreePartInfo::fromPartName(part, data.format_version).partition_id == partition_id)
parts_to_fetch_partition.push_back(part);
}
......
......@@ -243,13 +243,6 @@ private:
String replica_name;
String replica_path;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
ReplicatedMergeTreeQueue queue;
std::atomic<time_t> last_queue_update_start_time{0};
std::atomic<time_t> last_queue_update_finish_time{0};
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
......@@ -275,6 +268,14 @@ private:
MergeTreeDataWriter writer;
MergeTreeDataMerger merger;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
ReplicatedMergeTreeQueue queue;
std::atomic<time_t> last_queue_update_start_time{0};
std::atomic<time_t> last_queue_update_finish_time{0};
DataPartsExchange::Fetcher fetcher;
RemoteDiskSpaceMonitor::Client disk_space_monitor_client;
ShardedPartitionUploader::Client sharded_partition_uploader_client;
......
......@@ -9,7 +9,8 @@ int main(int argc, char ** argv)
for (DayNum_t date = today; DayNum_t(date + 10) > today; --date)
{
std::string name = DB::MergeTreePartInfo::getPartName(date, date, 0, 0, 0);
DB::MergeTreePartInfo part_info("partition", 0, 0, 0);
std::string name = part_info.getPartNameV0(date, date);
std::cerr << name << '\n';
time_t time = DateLUT::instance().YYYYMMDDToDate(DB::parse<UInt32>(name));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册