提交 27978739 编写于 作者: C CurtizJ

code cleanup

上级 7a549b27
......@@ -787,16 +787,6 @@ void IMergeTreeDataPart::remove() const
}
}
void IMergeTreeDataPart::accumulateColumnSizes(ColumnToSize & /* column_to_size */) const
{
throw Exception("Method 'accumulateColumnSizes' is not supported for data part with type " + typeToString(getType()), ErrorCodes::NOT_IMPLEMENTED);
}
void IMergeTreeDataPart::checkConsistency(bool /* require_part_metadata */) const
{
throw Exception("Method 'checkConsistency' is not supported for data part with type " + typeToString(getType()), ErrorCodes::NOT_IMPLEMENTED);
}
String IMergeTreeDataPart::typeToString(Type type)
{
switch (type)
......
......@@ -81,31 +81,16 @@ public:
virtual ColumnSize getTotalColumnsSize() const { return {}; }
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;
void setColumns(const NamesAndTypesList & columns_);
virtual NameToNameMap createRenameMapForAlter(
AlterAnalysisResult & /* analysis_result */,
const NamesAndTypesList & /* old_columns */) const { return {}; }
virtual ~IMergeTreeDataPart();
// virtual Checksums check(
// bool require_checksums,
// const DataTypes & primary_key_data_types, /// Check the primary key. If it is not necessary, pass an empty array.
// const MergeTreeIndices & indices = {}, /// Check skip indices
// std::function<bool()> is_cancelled = []{ return false; })
// {
// return {};
// }
using ColumnToSize = std::map<std::string, UInt64>;
virtual void accumulateColumnSizes(ColumnToSize & column_to_size) const;
virtual void accumulateColumnSizes(ColumnToSize & /* column_to_size */) const {}
using Type = MergeTreeDataPartType;
Type getType() const { return part_type; }
......@@ -128,6 +113,10 @@ public:
const std::optional<String> & relative_path,
Type part_type_);
void setColumns(const NamesAndTypesList & new_columns);
const NamesAndTypesList & getColumns() const { return columns; }
void assertOnDisk() const;
void remove() const;
......@@ -142,9 +131,12 @@ public:
/// This is useful when you want to change e.g. block numbers or the mutation version of the part.
String getNewName(const MergeTreePartInfo & new_part_info) const;
// Block sample_block;
std::optional<size_t> getColumnPosition(const String & column_name) const;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }
/// If the partition key includes date column (a common case), these functions will return min and max values for this column.
......@@ -284,9 +276,6 @@ public:
Checksums checksums;
/// Columns description.
NamesAndTypesList columns;
/// Columns with values, that all have been zeroed by expired ttl
NameSet expired_columns;
......@@ -315,9 +304,13 @@ public:
static UInt64 calculateTotalSizeOnDisk(const String & from);
protected:
/// Columns description.
NamesAndTypesList columns;
Type part_type;
void removeIfNeeded();
virtual void checkConsistency(bool require_part_metadata) const;
virtual void checkConsistency(bool require_part_metadata) const = 0;
void checkConsistencyBase() const;
private:
......
......@@ -68,7 +68,6 @@ protected:
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
/// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data.
MergeTreeReaderSettings settings;
......
......@@ -9,7 +9,6 @@ namespace DB
{
/// FIXME: implement for compact parts
NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns)
{
NameSet required_columns{std::begin(columns), std::end(columns)};
......@@ -244,9 +243,9 @@ MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
/// This may be not true in case of ALTER MODIFY.
if (!pre_column_names.empty())
storage.check(data_part->columns, pre_column_names);
storage.check(data_part->getColumns(), pre_column_names);
if (!column_names.empty())
storage.check(data_part->columns, column_names);
storage.check(data_part->getColumns(), column_names);
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
result.pre_columns = physical_columns.addTypes(pre_column_names);
......@@ -254,8 +253,8 @@ MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const
}
else
{
result.pre_columns = data_part->columns.addTypes(pre_column_names);
result.columns = data_part->columns.addTypes(column_names);
result.pre_columns = data_part->getColumns().addTypes(pre_column_names);
result.columns = data_part->getColumns().addTypes(column_names);
}
result.should_reorder = should_reorder;
......
......@@ -1707,10 +1707,10 @@ void MergeTreeData::alterDataPart(
const auto settings = getSettings();
const auto & part = transaction->getDataPart();
auto res = analyzeAlterConversions(part->columns, new_columns, getIndices().indices, new_indices);
auto res = analyzeAlterConversions(part->getColumns(), new_columns, getIndices().indices, new_indices);
NamesAndTypesList additional_columns;
transaction->rename_map = part->createRenameMapForAlter(res, part->columns);
transaction->rename_map = part->createRenameMapForAlter(res, part->getColumns());
if (!transaction->rename_map.empty())
{
......@@ -1853,7 +1853,7 @@ void MergeTreeData::alterDataPart(
/// Write the new column list to the temporary file.
{
transaction->new_columns = new_columns.filter(part->columns.getNames());
transaction->new_columns = new_columns.filter(part->getColumns().getNames());
WriteBufferFromFile columns_file(part->getFullPath() + "columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
......@@ -1888,7 +1888,7 @@ void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr
return;
NamesAndTypesList new_columns;
for (const auto & [name, type] : data_part->columns)
for (const auto & [name, type] : data_part->getColumns())
if (!empty_columns.count(name))
new_columns.emplace_back(name, type);
......@@ -2762,7 +2762,6 @@ void MergeTreeData::loadPartAndFixMetadata(MutableDataPartPtr part)
String full_part_path = part->getFullPath();
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
/// FIXME looks not right
if (isWidePart(part))
if (Poco::File(full_part_path + "columns.txt").exists())
Poco::File(full_part_path + "columns.txt").remove();
......@@ -2799,7 +2798,7 @@ void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
{
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
for (const auto & column : part->columns)
for (const auto & column : part->getColumns())
{
ColumnSize & total_column_size = column_sizes[column.name];
ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
......@@ -2811,7 +2810,7 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
{
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
for (const auto & column : part->columns)
for (const auto & column : part->getColumns())
{
ColumnSize & total_column_size = column_sizes[column.name];
ColumnSize part_column_size = part->getColumnSize(column.name, *column.type);
......
......@@ -183,7 +183,6 @@ public:
MergeTreeDataPartType choosePartType(size_t bytes_on_disk, size_t rows_count) const;
/// After this methods setColumns must be called
/// FIXME make this inside this function
MutableDataPartPtr createPart(const String & name,
const MergeTreePartInfo & part_info,const DiskPtr & disk,
const NamesAndTypesList & columns,
......@@ -194,7 +193,7 @@ public:
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const;
/// After this methods loadColumnsChecksumsIndexes must be called
/// After this methods 'loadColumnsChecksumsIndexes' must be called
/// FIXME make this inside this function
MutableDataPartPtr createPart(const String & name,
const DiskPtr & disk, const String & relative_path) const;
......
......@@ -965,7 +965,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
});
if (isCompactPart(source_part))
commands_for_part.additional_columns = source_part->columns.getNames();
commands_for_part.additional_columns = source_part->getColumns().getNames();
if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
{
......@@ -981,7 +981,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto & source_column_names = source_part->columns.getNames();
const auto & source_column_names = source_part->getColumns().getNames();
const auto & updated_column_names = updated_header.getNames();
NameSet new_columns_set(source_column_names.begin(), source_column_names.end());
......@@ -1170,7 +1170,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
/// Write a file with a description of columns.
WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096);
new_data_part->columns.writeText(out_columns);
new_data_part->getColumns().writeText(out_columns);
}
new_data_part->rows_count = source_part->rows_count;
......
......@@ -209,7 +209,7 @@ NameToNameMap MergeTreeDataPartCompact::createRenameMapForAlter(
return rename_map;
}
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata)
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
String path = getFullPath();
......@@ -253,7 +253,6 @@ void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata)
throw Exception(
"Part " + path + " is broken: bad size of marks file '" + file.path() + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}
}
......
......@@ -71,8 +71,6 @@ public:
ColumnSize getTotalColumnsSize() const override;
void checkConsistency(bool /* require_part_metadata */) const override {}
bool hasColumnFiles(const String & column_name, const IDataType & type) const override;
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
......@@ -84,10 +82,10 @@ public:
~MergeTreeDataPartCompact() override;
private:
void checkConsistency(bool require_part_metadata) const override;
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
void checkConsistency(bool require_part_metadata);
};
......
......@@ -82,10 +82,9 @@ public:
bool hasColumnFiles(const String & column, const IDataType & type) const override;
protected:
private:
void checkConsistency(bool require_part_metadata) const override;
private:
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
......
......@@ -42,7 +42,6 @@ void MergeTreeDataPartWriterCompact::write(
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
/// FIXME maybe it's wrong at this stage.
if (compute_granularity)
fillIndexGranularity(block);
......
......@@ -294,7 +294,6 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
{
if (!serialization_states.empty())
{
/// FIXME maybe we need skip_offsets=false in some cases
serialize_settings.getter = createStreamGetter(it->name, written_offset_columns ? *written_offset_columns : offset_columns);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
}
......
......@@ -6,7 +6,6 @@ namespace DB
class MergeTreeDataPartWriterWide : public IMergeTreeDataPartWriter
{
public:
using ColumnToSize = std::map<std::string, UInt64>;
MergeTreeDataPartWriterWide(
......@@ -26,6 +25,7 @@ public:
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns);
private:
/// Write data of one column.
/// Return how many marks were written and
/// how many rows were written for last mark
......@@ -35,7 +35,6 @@ public:
const IColumn & column,
WrittenOffsetColumns & offset_columns);
private:
/// Write single granule of one column (rows between 2 marks)
size_t writeSingleGranule(
const String & name,
......
......@@ -10,6 +10,8 @@ struct MergeTreeReaderSettings
size_t min_bytes_to_use_direct_io = 0;
size_t min_bytes_to_use_mmap_io = 0;
size_t max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
/// If save_marks_in_cache is false, then, if marks are not in cache,
/// we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache = false;
};
......
......@@ -74,8 +74,6 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr
size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
/// FIXME compute correct granularity
if (continue_reading)
from_mark = next_mark;
......@@ -155,7 +153,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
MergeTreeReaderCompact::ColumnPosition MergeTreeReaderCompact::findColumnForOffsets(const String & column_name)
{
String table_name = Nested::extractTableName(column_name);
for (const auto & part_column : data_part->columns)
for (const auto & part_column : data_part->getColumns())
{
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
{
......@@ -206,7 +204,7 @@ void MergeTreeReaderCompact::initMarksLoader()
if (marks_loader.initialized())
return;
size_t columns_num = data_part->columns.size();
size_t columns_num = data_part->getColumns().size();
auto load = [this, columns_num](const String & mrk_path) -> MarkCache::MappedPtr
{
......@@ -273,7 +271,7 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi
return false;
const auto & [last_mark, last_column] = *last_read_granule;
return (mark == last_mark && column_position == last_column + 1)
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->columns.size() - 1);
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1);
}
}
......@@ -9,9 +9,7 @@
namespace DB
{
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).ca
/// Reader for compact parts
class MergeTreeReaderCompact : public IMergeTreeReader
{
public:
......@@ -39,7 +37,9 @@ private:
MergeTreeMarksLoader marks_loader;
using ColumnPosition = std::optional<size_t>;
/// Positions of columns in part structe.
std::vector<ColumnPosition> column_positions;
/// Should we read full column or only it's offsets
std::vector<bool> read_only_offsets;
size_t next_mark = 0;
......@@ -52,10 +52,6 @@ private:
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);
ColumnPosition findColumnForOffsets(const String & column_name);
/// Columns that are read.
friend class MergeTreeRangeReader::DelayedStream;
};
}
......@@ -8,9 +8,7 @@
namespace DB
{
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).
/// Reader for Wide parts.
class MergeTreeReaderWide : public IMergeTreeReader
{
public:
......@@ -35,8 +33,6 @@ private:
FileStreams streams;
/// Columns that are read.
void addStreams(const String & name, const IDataType & type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
......@@ -44,8 +40,6 @@ private:
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool read_offsets = true);
friend class MergeTreeRangeReader::DelayedStream;
};
}
......@@ -15,7 +15,7 @@ static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & da
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
for (const auto & name_type : data_part->columns)
for (const auto & name_type : data_part->getColumns())
{
if (header.has(name_type.name))
{
......
......@@ -15,7 +15,7 @@ static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & da
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
for (const auto & name_type : data_part->columns)
for (const auto & name_type : data_part->getColumns())
{
if (header.has(name_type.name))
{
......
......@@ -48,7 +48,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
else
{
/// take columns from data_part
columns_for_reader = data_part->columns.addTypes(columns_to_read);
columns_for_reader = data_part->getColumns().addTypes(columns_to_read);
}
MergeTreeReaderSettings reader_settings =
......@@ -68,7 +68,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
void MergeTreeSequentialBlockInputStream::fixHeader(Block & header_block) const
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
for (const auto & name_type : data_part->columns)
for (const auto & name_type : data_part->getColumns())
{
if (header_block.has(name_type.name))
{
......
......@@ -25,13 +25,12 @@ class ASTStorage;
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
{
/// FIXME description for settings
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.", 0) \
\
/** Data storing format settigns. */ \
M(SettingUInt64, min_bytes_for_wide_part, 0, "", 0) \
M(SettingUInt64, min_rows_for_wide_part, 10000000000, "", 0) \
M(SettingUInt64, min_bytes_for_wide_part, 0, "Minimal uncompressed size in bytes to create part in wide format instead of compact", 0) \
M(SettingUInt64, min_rows_for_wide_part, 10000000000, "Minimal number of rows to create part in wide format instead of compact", 0) \
\
/** Merge settings. */ \
M(SettingUInt64, merge_max_block_size, DEFAULT_MERGE_BLOCK_SIZE, "How many rows in blocks should be formed for merge operations.", 0) \
......
......@@ -27,7 +27,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
void MergedColumnOnlyOutputStream::write(const Block & block)
{
std::set<String> skip_indexes_column_names_set;
std::unordered_set<String> skip_indexes_column_names_set;
for (const auto & index : writer->getSkipIndices())
std::copy(index->columns.cbegin(), index->columns.cend(),
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
......
......@@ -202,7 +202,7 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta
void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
{
storage.check(part->columns);
storage.check(part->getColumns());
assertSessionIsNotExpired(zookeeper);
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
......
......@@ -206,7 +206,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->columns, part->checksums);
part->getColumns(), part->checksums);
String part_path = storage.replica_path + "/parts/" + part_name;
String part_znode;
......
......@@ -138,7 +138,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
{
return checkDataPart(
data_part->getFullPath(),
data_part->columns,
data_part->getColumns(),
data_part->getType(),
require_checksums,
is_cancelled);
......
......@@ -767,11 +767,11 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (part_name.empty())
part_name = part->name;
check(part->columns);
check(part->getColumns());
int expected_columns_version = columns_version;
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->columns, part->checksums);
part->getColumns(), part->checksums);
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
......@@ -853,7 +853,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
ops.emplace_back(zkutil::makeCreateRequest(
part_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(
part_path + "/columns", part->columns.toString(), zkutil::CreateMode::Persistent));
part_path + "/columns", part->getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(
part_path + "/checksums", getChecksumsForZooKeeper(part->checksums), zkutil::CreateMode::Persistent));
}
......@@ -5323,7 +5323,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
{
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/parts/" + part->name,
ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->columns, part->checksums).toString(),
ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(),
zkutil::CreateMode::Persistent));
}
else
......@@ -5334,7 +5334,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/parts/" + part->name + "/columns",
part->columns.toString(),
part->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/parts/" + part->name + "/checksums",
......
......@@ -101,7 +101,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns_, co
using State =IMergeTreeDataPart::State;
for (const auto & column : part->columns)
for (const auto & column : part->getColumns())
{
size_t j = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册