提交 9cd28326 编写于 作者: A alesapin

Better size calculation

上级 fd7be934
......@@ -229,7 +229,10 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
column_name_to_position.reserve(new_columns.size());
size_t pos = 0;
for (const auto & column : columns)
{
column_name_to_position.emplace(column.name, pos++);
}
total_columns_size = getTotalColumnsSize();
}
IMergeTreeDataPart::~IMergeTreeDataPart() = default;
......@@ -406,6 +409,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadColumns(require_columns_checksums);
loadChecksums(require_columns_checksums);
calculateColumnsSizesOnDisk();
loadIndexGranularity();
loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`.
......@@ -870,6 +874,31 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
void IMergeTreeDataPart::calculateColumnsSizesOnDisk()
{
if (getColumns().empty() || checksums.empty())
throw Exception("Cannot calculate columns sizes when columns or checksums are not initialized", ErrorCodes::LOGICAL_ERROR);
calculateEachColumnSizesOnDisk(columns_sizes, total_columns_size);
}
ColumnSize IMergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & /* type */) const
{
/// For some types of parts columns_size maybe not calculated
auto it = columns_sizes.find(column_name);
if (it != columns_sizes.end())
return it->second;
return ColumnSize{};
}
void IMergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
for (const auto & [name, size] : columns_sizes)
column_to_size[name] = size.data_compressed;
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
......
......@@ -92,18 +92,16 @@ public:
virtual bool supportsVerticalMerge() const { return false; }
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (by locking table structure).
virtual ColumnSize getColumnSize(const String & /* name */, const IDataType & /* type */) const { return {}; }
ColumnSize getColumnSize(const String & column_name, const IDataType & /* type */) const;
virtual ColumnSize getTotalColumnsSize() const { return {}; }
ColumnSize getTotalColumnsSize() const { return total_columns_size; }
virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;
virtual ~IMergeTreeDataPart();
using ColumnToSize = std::map<std::string, UInt64>;
virtual void accumulateColumnSizes(ColumnToSize & /* column_to_size */) const {}
void accumulateColumnSizes(ColumnToSize & /* column_to_size */) const;
Type getType() const { return part_type; }
......@@ -159,7 +157,6 @@ public:
size_t rows_count = 0;
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
/// May not contain size of checksums.txt and columns.txt
time_t modification_time = 0;
......@@ -295,9 +292,16 @@ public:
virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const{ return false; }
static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from);
void calculateColumnsSizesOnDisk();
protected:
/// Columns description.
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
ColumnSize total_columns_size;
/// Size for each column, calculated once in calcuateColumnSizesOnDisk
ColumnSizeByName columns_sizes;
/// Columns description. Cannot be changed, after part initialiation.
NamesAndTypesList columns;
const Type part_type;
......@@ -306,6 +310,10 @@ protected:
virtual void checkConsistency(bool require_part_metadata) const = 0;
void checkConsistencyBase() const;
/// Fill each_columns_size and total_size with sizes from columns files on
/// disk using columns and checksums.
virtual void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0;
private:
/// In compact parts order of columns is necessary
NameToPosition column_name_to_position;
......
......@@ -1564,7 +1564,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
new_data_part->modification_time = time(nullptr);
new_data_part->bytes_on_disk
= MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->disk, new_data_part->getFullRelativePath());
new_data_part->calculateColumnsSizesOnDisk();
}
......
......@@ -73,9 +73,9 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
default_codec, writer_settings, computed_index_granularity);
}
ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
void MergeTreeDataPartCompact::calculateEachColumnSizesOnDisk(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const
{
ColumnSize total_size;
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
if (bin_checksum != checksums.files.end())
{
......@@ -86,8 +86,6 @@ ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
if (mrk_checksum != checksums.files.end())
total_size.marks += mrk_checksum->second.file_size;
return total_size;
}
void MergeTreeDataPartCompact::loadIndexGranularity()
......
......@@ -54,8 +54,6 @@ public:
bool isStoredOnDisk() const override { return true; }
ColumnSize getTotalColumnsSize() const override;
bool hasColumnFiles(const String & column_name, const IDataType & type) const override;
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
......@@ -67,6 +65,9 @@ private:
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
/// Compact parts doesn't support per column size, only total size
void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
};
}
......@@ -96,23 +96,6 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size;
}
ColumnSize MergeTreeDataPartWide::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}
ColumnSize MergeTreeDataPartWide::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullRelativePath();
......@@ -157,20 +140,6 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide()
removeIfNeeded();
}
void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
{
IDataType::SubstreamPath path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
auto bin_file_path = getFullRelativePath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin";
if (disk->exists(bin_file_path))
column_to_size[name_type.name] += disk->getFileSize(bin_file_path);
}, path);
}
}
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
......@@ -258,4 +227,15 @@ String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & colum
return filename;
}
void MergeTreeDataPartWide::calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const
{
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
each_columns_size[column.name] = size;
total_size.add(size);
}
}
}
......@@ -48,14 +48,8 @@ public:
bool supportsVerticalMerge() const override { return true; }
void accumulateColumnSizes(ColumnToSize & column_to_size) const override;
String getFileNameForColumn(const NameAndTypePair & column) const override;
ColumnSize getTotalColumnsSize() const override;
ColumnSize getColumnSize(const String & column_name, const IDataType & type) const override;
~MergeTreeDataPartWide() override;
bool hasColumnFiles(const String & column, const IDataType & type) const override;
......@@ -67,6 +61,8 @@ private:
void loadIndexGranularity() override;
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const;
void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
};
}
......@@ -143,6 +143,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsSizesOnDisk();
}
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册