提交 c5b04f7f 编写于 作者: A Alex Zatelepin 提交者: Vitaliy Lyudvichenko

Add info about total uncompressed part size to system tables (#2052)

* column size calculating routines now take into account multiple streams [#CLICKHOUSE-2831]

* rename for clarity

* use more appropriate method for calculating table size

* add info about total uncompressed part size to system.parts and system.parts_columns [#CLICKHOUSE-2831]

* rename columns for clarity [#CLICKHOUSE-2831]

* count shared substreams only once [#CLICKHOUSE-2831]
上级 ce5d9264
......@@ -108,7 +108,7 @@ bool PartLog::addNewPartToTheLog(Context & context, const MergeTreeDataPart & pa
elem.table_name = part.storage.getTableName();
elem.part_name = part.name;
elem.bytes_compressed_on_disk = part.size_in_bytes;
elem.bytes_compressed_on_disk = part.bytes_on_disk;
elem.rows = part.rows_count;
elem.error = static_cast<UInt16>(execution_status.code);
......
......@@ -102,8 +102,6 @@ void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const N
const String & column_name = column_with_type_and_name.name;
const ColumnPtr & column_data = column_with_type_and_name.column;
const auto column_checksum = data_part->tryGetBinChecksum(column_name);
if (!from_update && !names_set.count(column_name))
continue;
......@@ -122,8 +120,11 @@ void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const N
ColumnInfo info;
info.name = column_name;
/// If column isn't fixed and doesn't have checksum, than take first
info.bytes_per_row_global = column_checksum
? column_checksum->uncompressed_size / number_of_rows_in_part
MergeTreeDataPart::ColumnSize column_size = data_part->getColumnSize(
column_name, *column_with_type_and_name.type);
info.bytes_per_row_global = column_size.data_uncompressed
? column_size.data_uncompressed / number_of_rows_in_part
: column_data->byteSize() / std::max<size_t>(1, column_data->size());
dynamic_columns_infos.emplace_back(info);
......
......@@ -747,7 +747,7 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
for (auto & part : parts)
{
part_log_elem.part_name = part->name;
part_log_elem.bytes_compressed_on_disk = part->size_in_bytes;
part_log_elem.bytes_compressed_on_disk = part->bytes_on_disk;
part_log_elem.rows = part->rows_count;
part_log->add(part_log_elem);
......@@ -1211,8 +1211,8 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
auto compression_settings = this->context.chooseCompressionSettings(
part->size_in_bytes,
static_cast<double>(part->size_in_bytes) / this->getTotalActiveSizeInBytes());
part->bytes_on_disk,
static_cast<double>(part->bytes_on_disk) / this->getTotalActiveSizeInBytes());
ExpressionBlockInputStream in(part_in, expression);
/** Don't write offsets for arrays, because ALTER never change them
......@@ -1314,7 +1314,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
file.remove();
}
mutable_part.size_in_bytes = MergeTreeData::DataPart::calculateTotalSize(path);
mutable_part.bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(path);
/// TODO: we can skip resetting caches when the column is added.
data_part->storage.context.dropCaches();
......@@ -1674,7 +1674,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->size_in_bytes;
res += part->bytes_on_disk;
}
return res;
......@@ -1832,59 +1832,37 @@ void MergeTreeData::calculateColumnSizesImpl()
void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
{
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
const auto & files = part->checksums.files;
/// TODO This method doesn't take into account columns with multiple files.
for (const auto & column : getColumns().getAllPhysical())
for (const auto & column : part->columns)
{
const auto escaped_name = escapeForFileName(column.name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
ColumnSize & column_size = column_sizes[column.name];
if (files.count(bin_file_name))
{
const auto & bin_file_checksums = files.at(bin_file_name);
column_size.data_compressed += bin_file_checksums.file_size;
column_size.data_uncompressed += bin_file_checksums.uncompressed_size;
}
if (files.count(mrk_file_name))
column_size.marks += files.at(mrk_file_name).file_size;
DataPart::ColumnSize & total_column_size = column_sizes[column.name];
DataPart::ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
total_column_size.add(part_column_size);
}
}
static inline void logSubtract(size_t & from, size_t value, Logger * log, const String & variable)
{
if (value > from)
LOG_ERROR(log, "Possibly incorrect subtraction: " << from << " - " << value << " = " << from - value << ", variable " << variable);
from -= value;
}
void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
{
const auto & files = part->checksums.files;
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
/// TODO This method doesn't take into account columns with multiple files.
for (const auto & column : getColumns().getAllPhysical())
for (const auto & column : part->columns)
{
const auto escaped_name = escapeForFileName(column.name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
DataPart::ColumnSize & total_column_size = column_sizes[column.name];
DataPart::ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
auto & column_size = column_sizes[column.name];
if (files.count(bin_file_name))
auto log_subtract = [&](size_t & from, size_t value, const char * field)
{
const auto & bin_file_checksums = files.at(bin_file_name);
logSubtract(column_size.data_compressed, bin_file_checksums.file_size, log, bin_file_name + ".file_size");
logSubtract(column_size.data_uncompressed, bin_file_checksums.uncompressed_size, log, bin_file_name + ".uncompressed_size");
}
if (value > from)
LOG_ERROR(log, "Possibly incorrect column size subtraction: "
<< from << " - " << value << " = " << from - value
<< ", column: " << column.name << ", field: " << field);
from -= value;
};
if (files.count(mrk_file_name))
logSubtract(column_size.marks, files.at(mrk_file_name).file_size, log, mrk_file_name + ".file_size");
log_subtract(total_column_size.data_compressed, part_column_size.data_compressed, ".data_compressed");
log_subtract(total_column_size.data_uncompressed, part_column_size.data_uncompressed, ".data_uncompressed");
log_subtract(total_column_size.marks, part_column_size.marks, ".marks");
}
}
......
......@@ -452,18 +452,6 @@ public:
/// Returns the size of partition in bytes.
size_t getPartitionSize(const std::string & partition_id) const;
struct ColumnSize
{
size_t marks = 0;
size_t data_compressed = 0;
size_t data_uncompressed = 0;
size_t getTotalCompressedSize() const
{
return marks + data_compressed;
}
};
size_t getColumnCompressedSize(const std::string & name) const
{
std::lock_guard<std::mutex> lock{data_parts_mutex};
......@@ -472,25 +460,13 @@ public:
return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
}
using ColumnSizes = std::unordered_map<std::string, ColumnSize>;
ColumnSizes getColumnSizes() const
using ColumnSizeByName = std::unordered_map<std::string, DataPart::ColumnSize>;
ColumnSizeByName getColumnSizes() const
{
std::lock_guard<std::mutex> lock{data_parts_mutex};
return column_sizes;
}
/// NOTE Could be off after DROPped and MODIFYed columns in ALTER. Doesn't include primary.idx.
size_t getTotalCompressedSize() const
{
std::lock_guard<std::mutex> lock{data_parts_mutex};
size_t total_size = 0;
for (const auto & col : column_sizes)
total_size += col.second.getTotalCompressedSize();
return total_size;
}
/// Calculates column sizes in compressed form for the current state of data_parts.
void recalculateColumnSizes()
{
......@@ -555,7 +531,7 @@ private:
String full_path;
/// Current column sizes in compressed and uncompressed form.
ColumnSizes column_sizes;
ColumnSizeByName column_sizes;
/// Engine-specific methods
BrokenPartCallback broken_part_callback;
......
......@@ -180,7 +180,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(
}
IMergeSelector::Part part_info;
part_info.size = part->size_in_bytes;
part_info.size = part->bytes_on_disk;
part_info.age = current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
......@@ -266,7 +266,7 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
return false;
}
sum_bytes += (*it)->size_in_bytes;
sum_bytes += (*it)->bytes_on_disk;
prev_it = it;
++it;
......@@ -532,7 +532,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_bytes_compressed += part->bytes_on_disk;
merge_entry->total_size_marks += part->marks_count;
}
......@@ -883,7 +883,7 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
{
size_t res = 0;
for (const MergeTreeData::DataPartPtr & part : parts)
res += part->size_in_bytes;
res += part->bytes_on_disk;
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
......
......@@ -134,26 +134,51 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
{
}
/// Returns the size of .bin file for column `name` if found, zero otherwise.
UInt64 MergeTreeDataPart::getColumnCompressedSize(const String & name) const
/// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once.
MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const
{
const Checksum * checksum = tryGetBinChecksum(name);
ColumnSize size;
if (checksums.empty())
return size;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name, substream_path);
if (processed_substreams && !processed_substreams->insert(file_name).second)
return;
auto bin_checksum = checksums.files.find(file_name + ".bin");
if (bin_checksum != checksums.files.end())
{
size.data_compressed += bin_checksum->second.file_size;
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(file_name + ".mrk");
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
}, {});
/// Probably a logic error, not sure if this can ever happen if checksums are not empty
return checksum ? checksum->file_size : 0;
return size;
}
UInt64 MergeTreeDataPart::getColumnUncompressedSize(const String & name) const
MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSize(const String & name, const IDataType & type) const
{
const Checksum * checksum = tryGetBinChecksum(name);
return checksum ? checksum->uncompressed_size : 0;
return getColumnSizeImpl(name, type, nullptr);
}
UInt64 MergeTreeDataPart::getColumnMrkSize(const String & name) const
MergeTreeDataPart::ColumnSize MergeTreeDataPart::getTotalColumnsSize() const
{
const Checksum * checksum = tryGetMrkChecksum(name);
return checksum ? checksum->file_size : 0;
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}
......@@ -171,7 +196,7 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
if (!hasColumnFiles(column.name))
continue;
const auto size = getColumnCompressedSize(column.name);
const auto size = getColumnSize(column.name, *column.type).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
......@@ -251,7 +276,7 @@ MergeTreeDataPart::~MergeTreeDataPart()
}
}
UInt64 MergeTreeDataPart::calculateTotalSize(const String & from)
UInt64 MergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
{
Poco::File cur(from);
if (cur.isFile())
......@@ -260,7 +285,7 @@ UInt64 MergeTreeDataPart::calculateTotalSize(const String & from)
cur.list(files);
UInt64 res = 0;
for (const auto & file : files)
res += calculateTotalSize(from + file);
res += calculateTotalSizeOnDisk(from + file);
return res;
}
......@@ -420,7 +445,7 @@ void MergeTreeDataPart::loadIndex()
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
size_in_bytes = calculateTotalSize(getFullPath());
bytes_on_disk = calculateTotalSizeOnDisk(getFullPath());
}
void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
......@@ -484,20 +509,21 @@ void MergeTreeDataPart::loadRowsCount()
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn();
const auto checksum = tryGetBinChecksum(column.name);
if (!column_col->isFixedAndContiguous())
continue;
/// Should be fixed non-nullable column
if (!checksum || !column_col->isFixedAndContiguous())
size_t column_size = getColumnSize(column.name, *column.type).data_uncompressed;
if (!column_size)
continue;
size_t sizeof_field = column_col->sizeOfValueIfFixed();
rows_count = checksum->uncompressed_size / sizeof_field;
rows_count = column_size / sizeof_field;
if (checksum->uncompressed_size % sizeof_field != 0)
if (column_size % sizeof_field != 0)
{
throw Exception(
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
+ ", sizeof " + toString(sizeof_field),
"Uncompressed size of column " + column.name + "(" + toString(column_size)
+ ") is not divisible by the size of value (" + toString(sizeof_field) + ")",
ErrorCodes::LOGICAL_ERROR);
}
......@@ -670,29 +696,6 @@ bool MergeTreeDataPart::hasColumnFiles(const String & column) const
}
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetChecksum(const String & name, const String & ext) const
{
if (checksums.empty())
return nullptr;
const auto & files = checksums.files;
const auto file_name = escapeForFileName(name) + ext;
auto it = files.find(file_name);
return (it == files.end()) ? nullptr : &it->second;
}
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const
{
return tryGetChecksum(name, ".bin");
}
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetMrkChecksum(const String & name) const
{
return tryGetChecksum(name, ".mrk");
}
UInt64 MergeTreeDataPart::getIndexSizeInBytes() const
{
UInt64 res = 0;
......@@ -709,18 +712,6 @@ UInt64 MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
return res;
}
UInt64 MergeTreeDataPart::getTotalMrkSizeInBytes() const
{
UInt64 res = 0;
for (const NameAndTypePair & it : columns)
{
const Checksum * checksum = tryGetMrkChecksum(it.name);
if (checksum)
res += checksum->file_size;
}
return res;
}
String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
{
switch (state)
......
......@@ -29,22 +29,31 @@ struct MergeTreeDataPart
MergeTreeDataPart(MergeTreeData & storage_, const String & name_);
const Checksum * tryGetChecksum(const String & name, const String & ext) const;
/// Returns checksum of column's binary file.
const Checksum * tryGetBinChecksum(const String & name) const;
/// Returns checksum of column's mrk file.
const Checksum * tryGetMrkChecksum(const String & name) const;
/// Returns the size of .bin file for column `name` if found, zero otherwise
UInt64 getColumnCompressedSize(const String & name) const;
UInt64 getColumnUncompressedSize(const String & name) const;
/// Returns the size of .mrk file for column `name` if found, zero otherwise
UInt64 getColumnMrkSize(const String & name) const;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
struct ColumnSize
{
size_t marks = 0;
size_t data_compressed = 0;
size_t data_uncompressed = 0;
void add(const ColumnSize & other)
{
marks += other.marks;
data_compressed += other.data_compressed;
data_uncompressed += other.data_uncompressed;
}
};
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
ColumnSize getColumnSize(const String & name, const IDataType & type) const;
ColumnSize getTotalColumnsSize() const;
/// Returns full path to part dir
String getFullPath() const;
......@@ -68,8 +77,8 @@ struct MergeTreeDataPart
size_t rows_count = 0;
size_t marks_count = 0;
std::atomic<UInt64> size_in_bytes {0}; /// size in bytes, 0 - if not counted;
/// is used from several threads without locks (it is changed with ALTER).
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
......@@ -208,7 +217,7 @@ struct MergeTreeDataPart
~MergeTreeDataPart();
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSize(const String & from);
static UInt64 calculateTotalSizeOnDisk(const String & from);
void remove() const;
......@@ -232,8 +241,6 @@ struct MergeTreeDataPart
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
/// Total size of *.mrk files
UInt64 getTotalMrkSizeInBytes() const;
private:
/// Reads columns names and types from columns.txt
......@@ -252,6 +259,8 @@ private:
void loadPartitionAndMinMaxIndex();
void checkConsistency(bool require_part_metadata);
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const;
};
......
......@@ -214,7 +214,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->size_in_bytes);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->bytes_on_disk);
return new_data_part;
}
......
......@@ -347,7 +347,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->columns = *total_column_list;
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums;
new_part->size_in_bytes = MergeTreeData::DataPart::calculateTotalSize(new_part->getFullPath());
new_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_part->getFullPath());
}
void MergedBlockOutputStream::init()
......
......@@ -634,7 +634,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
if (part)
sum_parts_size_in_bytes += part->size_in_bytes;
sum_parts_size_in_bytes += part->bytes_on_disk;
}
if (merger.merges_blocker.isCancelled())
......
......@@ -116,7 +116,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Set
bool StorageMergeTree::checkTableCanBeDropped() const
{
const_cast<MergeTreeData &>(getData()).recalculateColumnSizes();
context.checkTableCanBeDropped(database_name, table_name, getData().getTotalCompressedSize());
context.checkTableCanBeDropped(database_name, table_name, getData().getTotalActiveSizeInBytes());
return true;
}
......@@ -334,7 +334,7 @@ bool StorageMergeTree::merge(
part_log_elem.part_name = future_part.name;
if (new_part)
part_log_elem.bytes_compressed_on_disk = new_part->size_in_bytes;
part_log_elem.bytes_compressed_on_disk = new_part->bytes_on_disk;
part_log_elem.source_part_names.reserve(future_part.parts.size());
for (const auto & source_part : future_part.parts)
......
......@@ -1142,11 +1142,11 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
/// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica.
size_t sum_parts_size_in_bytes = 0;
size_t sum_parts_bytes_on_disk = 0;
for (const auto & part : parts)
sum_parts_size_in_bytes += part->size_in_bytes;
sum_parts_bytes_on_disk += part->bytes_on_disk;
if (sum_parts_size_in_bytes >= data.settings.prefer_fetch_merged_part_size_threshold)
if (sum_parts_bytes_on_disk >= data.settings.prefer_fetch_merged_part_size_threshold)
{
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
......@@ -1205,7 +1205,7 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
part_log_elem.part_name = entry.new_part_name;
if (part)
part_log_elem.bytes_compressed_on_disk = part->size_in_bytes;
part_log_elem.bytes_compressed_on_disk = part->bytes_on_disk;
part_log_elem.source_part_names.reserve(parts.size());
for (const auto & source_part : parts)
......@@ -2293,7 +2293,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
if (part)
{
part_log_elem.bytes_compressed_on_disk = part->size_in_bytes;
part_log_elem.bytes_compressed_on_disk = part->bytes_on_disk;
part_log_elem.rows = part->rows_count; /// Could be approximate (?)
}
......@@ -2949,7 +2949,7 @@ bool StorageReplicatedMergeTree::checkTableCanBeDropped() const
{
/// Consider only synchronized data
const_cast<MergeTreeData &>(getData()).recalculateColumnSizes();
context.checkTableCanBeDropped(database_name, table_name, getData().getTotalCompressedSize());
context.checkTableCanBeDropped(database_name, table_name, getData().getTotalActiveSizeInBytes());
return true;
}
......
......@@ -114,7 +114,7 @@ BlockInputStreams StorageSystemColumns::read(
NamesAndTypesList columns;
ColumnDefaults column_defaults;
MergeTreeData::ColumnSizes column_sizes;
MergeTreeData::ColumnSizeByName column_sizes;
{
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
......
......@@ -17,28 +17,30 @@ namespace DB
StorageSystemParts::StorageSystemParts(const std::string & name)
: StorageSystemPartsBase(name,
{
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
{"marks_size", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()},
{"bytes", std::make_shared<DataTypeUInt64>()},
{"modification_time", std::make_shared<DataTypeDateTime>()},
{"remove_time", std::make_shared<DataTypeDateTime>()},
{"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()},
{"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()},
{"bytes_on_disk", std::make_shared<DataTypeUInt64>()},
{"data_compressed_bytes", std::make_shared<DataTypeUInt64>()},
{"data_uncompressed_bytes", std::make_shared<DataTypeUInt64>()},
{"marks_bytes", std::make_shared<DataTypeUInt64>()},
{"modification_time", std::make_shared<DataTypeDateTime>()},
{"remove_time", std::make_shared<DataTypeDateTime>()},
{"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()},
{"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()}
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
}
)
{
......@@ -52,6 +54,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
{
const auto & part = info.all_parts[part_number];
auto part_state = info.all_parts_state[part_number];
MergeTreeDataPart::ColumnSize columns_size = part->getTotalColumnsSize();
size_t i = 0;
{
......@@ -62,9 +65,11 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
columns[i++]->insert(part->name);
columns[i++]->insert(static_cast<UInt64>(part_state == State::Committed));
columns[i++]->insert(static_cast<UInt64>(part->marks_count));
columns[i++]->insert(static_cast<UInt64>(part->getTotalMrkSizeInBytes()));
columns[i++]->insert(static_cast<UInt64>(part->rows_count));
columns[i++]->insert(static_cast<UInt64>(part->size_in_bytes));
columns[i++]->insert(static_cast<UInt64>(part->bytes_on_disk));
columns[i++]->insert(static_cast<UInt64>(columns_size.data_compressed));
columns[i++]->insert(static_cast<UInt64>(columns_size.data_uncompressed));
columns[i++]->insert(static_cast<UInt64>(columns_size.marks));
columns[i++]->insert(static_cast<UInt64>(part->modification_time));
time_t remove_time = part->remove_time.load(std::memory_order_relaxed);
......
......@@ -11,6 +11,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
......@@ -160,7 +161,7 @@ public:
try
{
/// For table not to be dropped.
/// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructure(false, __PRETTY_FUNCTION__);
}
catch (const Exception & e)
......@@ -280,7 +281,31 @@ bool StorageSystemPartsBase::hasColumn(const String & column_name) const
StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns_)
: name(std::move(name_))
{
setColumns(ColumnsDescription(std::move(columns_)));
NamesAndTypesList aliases;
ColumnDefaults defaults;
auto add_alias = [&](const String & alias_name, const String & column_name)
{
DataTypePtr type;
for (const NameAndTypePair & col : columns_)
{
if (col.name == column_name)
{
type = col.type;
break;
}
}
if (!type)
throw Exception("No column " + column_name + " in table system." + name, ErrorCodes::LOGICAL_ERROR);
aliases.push_back({alias_name, type});
defaults[alias_name] = ColumnDefault{ColumnDefaultKind::Alias, std::make_shared<ASTIdentifier>(column_name)};
};
/// Add aliases for old column names for backwards compatibility.
add_alias("bytes", "bytes_on_disk");
add_alias("marks_size", "marks_bytes");
setColumns(ColumnsDescription(std::move(columns_), {}, std::move(aliases), std::move(defaults)));
}
}
......@@ -16,39 +16,43 @@ namespace DB
StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name)
: StorageSystemPartsBase(name,
{
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
{"marks_bytes_in_block", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()},
{"bytes", std::make_shared<DataTypeUInt64>()},
{"modification_time", std::make_shared<DataTypeDateTime>()},
{"remove_time", std::make_shared<DataTypeDateTime>()},
{"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()},
{"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"column", std::make_shared<DataTypeString>()},
{"type", std::make_shared<DataTypeString>() },
{"default_kind", std::make_shared<DataTypeString>() },
{"default_expression", std::make_shared<DataTypeString>() },
{"data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{"data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
{"marks_bytes_in_column", std::make_shared<DataTypeUInt64>() },
}
)
: StorageSystemPartsBase(name,
{
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
{"rows", std::make_shared<DataTypeUInt64>()},
{"bytes_on_disk", std::make_shared<DataTypeUInt64>()},
{"data_compressed_bytes", std::make_shared<DataTypeUInt64>()},
{"data_uncompressed_bytes", std::make_shared<DataTypeUInt64>()},
{"marks_bytes", std::make_shared<DataTypeUInt64>()},
{"modification_time", std::make_shared<DataTypeDateTime>()},
{"remove_time", std::make_shared<DataTypeDateTime>()},
{"refcount", std::make_shared<DataTypeUInt32>()},
{"min_date", std::make_shared<DataTypeDate>()},
{"max_date", std::make_shared<DataTypeDate>()},
{"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()},
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"column", std::make_shared<DataTypeString>()},
{"type", std::make_shared<DataTypeString>() },
{"default_kind", std::make_shared<DataTypeString>() },
{"default_expression", std::make_shared<DataTypeString>() },
{"column_bytes_on_disk", std::make_shared<DataTypeUInt64>() },
{"column_data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{"column_data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
{"column_marks_bytes", std::make_shared<DataTypeUInt64>() },
}
)
{
}
......@@ -84,8 +88,8 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
{
const auto & part = info.all_parts[part_number];
auto part_state = info.all_parts_state[part_number];
auto columns_size = part->getTotalColumnsSize();
auto total_mrk_size_in_bytes = part->getTotalMrkSizeInBytes();
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
auto use_count = part.use_count() - 1;
auto min_date = part->getMinDate();
......@@ -106,10 +110,12 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
columns[j++]->insert(part->name);
columns[j++]->insert(static_cast<UInt64>(part_state == State::Committed));
columns[j++]->insert(static_cast<UInt64>(part->marks_count));
columns[j++]->insert(static_cast<UInt64>(total_mrk_size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(part->rows_count));
columns[j++]->insert(static_cast<UInt64>(part->size_in_bytes));
columns[j++]->insert(static_cast<UInt64>(part->bytes_on_disk));
columns[j++]->insert(static_cast<UInt64>(columns_size.data_compressed));
columns[j++]->insert(static_cast<UInt64>(columns_size.data_uncompressed));
columns[j++]->insert(static_cast<UInt64>(columns_size.marks));
columns[j++]->insert(static_cast<UInt64>(part->modification_time));
columns[j++]->insert(static_cast<UInt64>(part->remove_time.load(std::memory_order_relaxed)));
......@@ -142,9 +148,11 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
columns[j++]->insertDefault();
}
columns[j++]->insert(static_cast<UInt64>(part->getColumnCompressedSize(column.name)));
columns[j++]->insert(static_cast<UInt64>(part->getColumnUncompressedSize(column.name)));
columns[j++]->insert(static_cast<UInt64>(part->getColumnMrkSize(column.name)));
MergeTreeDataPart::ColumnSize column_size = part->getColumnSize(column.name, *column.type);
columns[j++]->insert(static_cast<UInt64>(column_size.data_compressed + column_size.marks));
columns[j++]->insert(static_cast<UInt64>(column_size.data_compressed));
columns[j++]->insert(static_cast<UInt64>(column_size.data_uncompressed));
columns[j++]->insert(static_cast<UInt64>(column_size.marks));
if (has_state_column)
columns[j++]->insert(part->stateString());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册