提交 b738d1ba 编写于 作者: V Vitaliy Lyudvichenko

Add multi index for data_parts storage. And fixed bugs. [#CLICKHOUSE-3452]

Fixed handling of obsolete parts.
Fixed conflict resolution between simultaneous PreCommitted covering parts.
Fixed memory leak caused by ordinary MergeTree parts stucked in Deleting state.
Added hidden _state column into system.parts.
上级 34eef961
......@@ -361,6 +361,7 @@ namespace ErrorCodes
extern const int HTTP_LENGTH_REQUIRED = 381;
extern const int CANNOT_LOAD_CATBOOST_MODEL = 382;
extern const int CANNOT_APPLY_CATBOOST_MODEL = 383;
extern const int PART_IS_TEMPORARILY_LOCKED = 384;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -74,6 +74,7 @@ namespace ErrorCodes
extern const int CORRUPTED_DATA;
extern const int INVALID_PARTITION_VALUE;
extern const int METADATA_MISMATCH;
extern const int PART_IS_TEMPORARILY_LOCKED;
}
......@@ -106,7 +107,9 @@ MergeTreeData::MergeTreeData(
database_name(database_), table_name(table_),
full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
log_name(log_name_), log(&Logger::get(log_name + " (Data)")),
data_parts_by_name(data_parts_indexes.get<TagByName>()),
data_parts_by_state_and_name(data_parts_indexes.get<TagByStateAndName>())
{
merging_params.check(*columns);
......@@ -381,7 +384,7 @@ Int64 MergeTreeData::getMaxDataPartIndex()
std::lock_guard<std::mutex> lock_all(data_parts_mutex);
Int64 max_block_id = 0;
for (const auto & part : data_parts)
for (const DataPartPtr & part : data_parts_by_name)
max_block_id = std::max(max_block_id, part->info.max_block);
return max_block_id;
......@@ -392,9 +395,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, "Loading data parts");
std::lock_guard<std::mutex> lock(data_parts_mutex);
data_parts.clear();
Strings part_file_names;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
......@@ -410,6 +410,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
DataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0;
std::lock_guard<std::mutex> lock(data_parts_mutex);
data_parts_indexes.clear();
for (const String & file_name : part_file_names)
{
MergeTreePartInfo part_info;
......@@ -496,7 +499,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
data_parts.insert(part);
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
if (suspicious_broken_parts > settings.max_suspicious_broken_parts && !skip_sanity_checks)
......@@ -512,13 +516,21 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// were merged), but that for some reason are still not deleted from the filesystem.
/// Deletion of files will be performed later in the clearOldParts() method.
if (data_parts.size() >= 2)
if (data_parts_indexes.size() >= 2)
{
auto committed_parts = getDataPartsRange({DataPartState::Committed});
auto prev_jt = committed_parts.begin();
/// Now all parts are committed, so data_parts_by_state_and_name == committed_parts_range
auto prev_jt = data_parts_by_state_and_name.begin();
auto curr_jt = std::next(prev_jt);
while (curr_jt != committed_parts.end())
auto deactivate_part = [&] (DataPartIteratorByStateAndName it)
{
(*it)->remove_time = (*it)->modification_time;
modifyPartState(it, DataPartState::Outdated);
};
(*prev_jt)->assertState({DataPartState::Committed});
while (curr_jt != data_parts_by_state_and_name.end() && (*curr_jt)->state == DataPartState::Committed)
{
/// Don't consider data parts belonging to different partitions.
if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
......@@ -530,16 +542,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if ((*curr_jt)->contains(**prev_jt))
{
(*prev_jt)->remove_time = (*prev_jt)->modification_time;
(*prev_jt)->state = DataPartState::Outdated; /// prev_jt becomes invalid here
deactivate_part(prev_jt);
prev_jt = curr_jt;
++curr_jt;
}
else if ((*prev_jt)->contains(**curr_jt))
{
(*curr_jt)->remove_time = (*curr_jt)->modification_time;
(*curr_jt)->state = DataPartState::Outdated; /// curr_jt becomes invalid here
++curr_jt;
auto next = std::next(curr_jt);
deactivate_part(curr_jt);
curr_jt = next;
}
else
{
......@@ -551,7 +562,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
calculateColumnSizesImpl();
LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
LOG_DEBUG(log, "Loaded data parts (" << data_parts_indexes.size() << " items)");
}
......@@ -619,21 +630,30 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
return res;
time_t now = time(nullptr);
std::vector<DataPartIteratorByStateAndName> parts_to_delete;
{
std::lock_guard<std::mutex> lock_parts(data_parts_mutex);
for (auto it = data_parts.begin(); it != data_parts.end(); ++it)
auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
for (auto it = outdated_parts_range.begin(); it != outdated_parts_range.end(); ++it)
{
if ((*it)->state == DataPartState::Outdated &&
it->unique() && /// Grab only parts that is not using by anyone (SELECTs for example)
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds())
const DataPartPtr & part = *it;
if (part.unique() && /// Grab only parts that is not using by anyone (SELECTs for example)
part->remove_time < now &&
now - part->remove_time > settings.old_parts_lifetime.totalSeconds())
{
(*it)->state = DataPartState::Deleting;
res.push_back(*it);
parts_to_delete.emplace_back(it);
}
}
res.reserve(parts_to_delete.size());
for (const auto & it_to_delete : parts_to_delete)
{
res.emplace_back(*it_to_delete);
modifyPartState(it_to_delete, DataPartState::Deleting);
}
}
if (!res.empty())
......@@ -650,7 +670,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
{
/// We should modify it under data_parts_mutex
part->assertState({DataPartState::Deleting});
part->state = DataPartState::Outdated;
modifyPartState(part, DataPartState::Outdated);
}
}
......@@ -661,26 +681,27 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
/// TODO: use data_parts iterators instead of pointers
for (auto & part : parts)
{
if (part->state != DataPartState::Deleting)
throw Exception("An attempt to delete part " + part->getNameWithState() + " with unexpected state", ErrorCodes::LOGICAL_ERROR);
auto it = data_parts.find(part);
if (it == data_parts.end())
auto it = data_parts_by_name.find(part->info);
if (it == data_parts_by_name.end())
throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR);
data_parts.erase(it);
(*it)->assertState({DataPartState::Deleting});
data_parts_indexes.erase(it);
}
}
void MergeTreeData::clearOldParts()
void MergeTreeData::clearOldPartsFromFilesystem()
{
auto parts_to_remove = grabOldParts();
for (const DataPartPtr & part : parts_to_remove)
{
LOG_DEBUG(log, "Removing part " << part->name);
LOG_DEBUG(log, "Removing part from filesystem " << part->name);
part->remove();
}
removePartsFinally(parts_to_remove);
}
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
......@@ -710,7 +731,7 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from memory.");
data_parts.clear();
data_parts_indexes.clear();
column_sizes.clear();
context.dropCaches();
......@@ -1319,9 +1340,13 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
part->assertState({DataPartState::Temporary});
DataPartsVector replaced;
MergeTreePartInfo part_info = part->info;
String part_name;
DataPartsVector replaced_parts;
std::vector<DataPartIteratorByStateAndName> replaced_iterators;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::unique_lock<std::mutex> lock(data_parts_mutex);
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
{
......@@ -1336,141 +1361,163 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
*/
if (increment)
part->info.min_block = part->info.max_block = increment->get();
part_info.min_block = part_info.max_block = increment->get();
String new_name;
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
new_name = part->info.getPartNameV0(part->getMinDate(), part->getMaxDate());
part_name = part_info.getPartNameV0(part->getMinDate(), part->getMaxDate());
else
new_name = part->info.getPartName();
part_name = part_info.getPartName();
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << new_name << ".");
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << part_name << ".");
auto it_duplicate = data_parts.find(part);
if (it_duplicate != data_parts.end())
auto it_duplicate = data_parts_by_name.find(part_info);
if (it_duplicate != data_parts_by_name.end())
{
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
message += ", but it will be deleted soon";
{
throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
}
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
}
/// Rename the part only in memory. Will rename it on disk only if all check is passed.
/// It allows us maintain invariant: if non-temporary parts in filesystem then they are in data_parts
part->name = new_name;
/// Check that part is not covered and doesn't cover other in-progress parts, it makes sense only for Replicated* engines
if (out_transaction)
{
auto check_coverage = [&part_info, &part_name] (const DataPartPtr & part)
{
if (part_info.contains(part->info))
{
throw Exception("Cannot add part " + part_name + " covering pre-committed part " + part->name, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
}
else
{
if (part->info.contains(part_info))
throw Exception("Cannot add part " + part_name + " covered by pre-committed part " + part->name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
}
};
auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::PreCommitted, part_info));
auto precommitted_parts_range = getDataPartsStateRange(DataPartState::PreCommitted);
for (auto it = it_middle; it != precommitted_parts_range.begin();)
{
--it;
check_coverage(*it);
}
for (auto it = it_middle; it != precommitted_parts_range.end();)
{
check_coverage(*it);
++it;
}
}
/// Is the part covered by some other part?
bool obsolete = false;
DataPartPtr covering_part;
auto check_replacing_part_state = [&] (const DataPartPtr & cur_part)
{
cur_part->assertState({DataPartState::PreCommitted, DataPartState::Committed});
if (cur_part->state == DataPartState::PreCommitted)
throw Exception("Could not add part " + new_name + " while replacing part " + cur_part->name + " is in pre-committed state", ErrorCodes::LOGICAL_ERROR);
};
auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info));
/// Don't consider parts going to be deleted
auto active_parts = getDataPartsRange({DataPartState::Committed, DataPartState::PreCommitted});
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
auto it_middle = active_parts.convert(data_parts.lower_bound(part));
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
/// Go to the left.
for (auto it = it_middle; it != active_parts.begin();)
for (auto it = it_middle; it != committed_parts_range.begin();)
{
--it;
if (!part->contains(**it))
if (!part_info.contains((*it)->info))
{
if ((*it)->contains(*part))
obsolete = true;
++it;
if ((*it)->info.contains(part_info))
covering_part = *it;
break;
}
check_replacing_part_state(*it);
replaced.push_back(*it);
// replaced.push_back(*it);
// (*it)->remove_time = time(nullptr);
// (*it)->state = replaced_parts_state;
// removePartContributionToColumnSizes(*it);
// data_parts.erase(it++); /// Yes, ++, not --.
replaced_iterators.push_back(it);
}
/// Parts must be in ascending order.
std::reverse(replaced.begin(), replaced.end());
std::reverse(replaced_iterators.begin(), replaced_iterators.end());
/// Go to the right.
for (auto it = it_middle; it != active_parts.end();)
for (auto it = it_middle; it != committed_parts_range.end();)
{
if ((*it)->name == part->name)
throw Exception("Unexpected duplicate part " + part->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
if ((*it)->name == part_name)
throw Exception("Unexpected duplicate part " + (*it)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!part->contains(**it))
if (!part_info.contains((*it)->info))
{
if ((*it)->contains(*part))
obsolete = true;
if ((*it)->info.contains(part_info))
covering_part = *it;
break;
}
check_replacing_part_state(*it);
replaced.push_back(*it);
replaced_iterators.push_back(it);
++it;
// replaced.push_back(*it);
// (*it)->remove_time = time(nullptr);
// (*it)->state = replaced_parts_state;
// removePartContributionToColumnSizes(*it);
// data_parts.erase(it++);
}
if (obsolete)
if (covering_part)
{
LOG_WARNING(log, "Obsolete part " << part->name << " added");
LOG_WARNING(log, "Tried to add obsolete part " << part_name << " covered by " << covering_part->getNameWithState());
/// It is a temporary part, we want to delete it from filesystem immediately
/// Other fields remain the same
part->remove_time = time(nullptr);
/// I case of fail, we want to delete part from filesystem immediately (to avoid any conflicts)
part->is_temp = true;
}
else
{
/// Now we can rename part on filesystem
part->is_temp = false;
part->renameTo(new_name);
if (!out_transaction)
/// Nothing to commit or rollback
if (out_transaction)
{
/// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately
part->state = DataPartState::Committed;
addPartContributionToColumnSizes(part);
}
else
{
/// Whereas ReplicatedMergeTree uses intermediate PreCommitted state
part->state = DataPartState::PreCommitted;
out_transaction->data = this;
out_transaction->parts_to_add_on_rollback = {};
out_transaction->parts_to_remove_on_rollback = {};
}
data_parts.insert(part);
/// We replaced nothing
return {};
}
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
///
/// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately,
/// whereas ReplicatedMergeTree uses intermediate PreCommitted state
part->name = part_name;
part->info = part_info;
part->is_temp = false;
part->state = (out_transaction) ? DataPartState::PreCommitted : DataPartState::Committed;
part->renameTo(part_name);
data_parts_indexes.insert(part);
replaced_parts.reserve(replaced_iterators.size());
for (auto it_replacing_part : replaced_iterators)
replaced_parts.emplace_back(*it_replacing_part);
if (!out_transaction)
{
addPartContributionToColumnSizes(part);
auto current_time = time(nullptr);
for (auto & replacing_part : replaced)
for (auto it_replacing_part : replaced_iterators)
{
if (!out_transaction)
{
replacing_part->remove_time = current_time;
replacing_part->state = DataPartState::Outdated;
removePartContributionToColumnSizes(replacing_part);
}
(*it_replacing_part)->remove_time = current_time;
modifyPartState(it_replacing_part, DataPartState::Outdated);
removePartContributionToColumnSizes(*it_replacing_part);
}
}
else
{
out_transaction->data = this;
out_transaction->parts_to_add_on_rollback = replaced_parts;
out_transaction->parts_to_remove_on_rollback = {part};
}
}
if (out_transaction)
{
out_transaction->data = this;
out_transaction->parts_to_add_on_rollback = replaced;
out_transaction->parts_to_remove_on_rollback = {part};
}
return replaced;
return replaced_parts;
}
void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout)
......@@ -1479,7 +1526,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo
for (auto & part : remove)
{
if (!data_parts.count(part))
if (!data_parts_by_name.count(part->info))
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
......@@ -1490,7 +1537,8 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo
{
if (part->state == DataPartState::Committed)
removePartContributionToColumnSizes(part);
part->state = DataPartState::Outdated;
modifyPartState(part, DataPartState::Outdated);
part->remove_time = remove_time;
}
}
......@@ -1502,65 +1550,93 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and detaching it.");
std::lock_guard<std::mutex> lock(data_parts_mutex);
//std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
auto it_part = data_parts.find(part_to_detach);
if (it_part == data_parts.end())
auto it_part = data_parts_by_name.find(part_to_detach->info);
if (it_part == data_parts_by_name.end())
throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART);
/// What if part_to_detach is reference to *it_part? Make a new owner just in case.
auto part = *it_part;
DataPartPtr part = *it_part;
removePartContributionToColumnSizes(part);
part->state = DataPartState::Deleting;
if (part->state == DataPartState::Committed)
removePartContributionToColumnSizes(part);
modifyPartState(it_part, DataPartState::Deleting);
if (move_to_detached || !prefix.empty())
part->renameAddPrefix(move_to_detached, prefix);
data_parts_indexes.erase(it_part);
if (restore_covered)
if (restore_covered && part->info.level == 0)
{
auto suitable_parts = getDataPartsRange({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
auto it = suitable_parts.convert(data_parts.lower_bound(part));
LOG_WARNING(log, "Will not recover parts covered by zero-level part " << part->name);
return;
}
if (restore_covered)
{
Strings restored;
bool error = false;
String error_parts;
Int64 pos = part->info.min_block;
if (it != suitable_parts.begin())
auto is_appropriate_state = [] (DataPartState state)
{
--it;
if (part->contains(**it))
return state == DataPartState::Committed || state == DataPartState::Outdated;
};
auto update_error = [&] (DataPartIteratorByAndName it)
{
error = true;
error_parts += (*it)->getNameWithState() + " ";
};
auto it_middle = data_parts_by_name.lower_bound(part->info);
/// Restore the leftmost part covered by the part
if (it_middle != data_parts_by_name.begin())
{
auto it = std::prev(it_middle);
if (part->contains(**it) && is_appropriate_state((*it)->state))
{
/// Maybe, we must consider part level somehow
if ((*it)->info.min_block != part->info.min_block)
error = true;
update_error(it);
if ((*it)->state != DataPartState::Committed)
{
addPartContributionToColumnSizes(*it);
(*it)->state = DataPartState::Committed;
modifyPartState(it, DataPartState::Committed); // iterator is not invalidated here
}
pos = (*it)->info.max_block + 1;
restored.push_back((*it)->name);
}
else
error = true;
++it;
update_error(it);
}
else
error = true;
for (; it != suitable_parts.end() && part->contains(**it); ++it)
/// Restore "right" parts
for (auto it = it_middle; it != data_parts_by_name.end() && part->contains(**it); ++it)
{
if ((*it)->info.min_block < pos)
continue;
if (!is_appropriate_state((*it)->state))
{
update_error(it);
continue;
}
if ((*it)->info.min_block > pos)
error = true;
update_error(it);
if ((*it)->state != DataPartState::Committed)
{
addPartContributionToColumnSizes(*it);
(*it)->state = DataPartState::Committed;
modifyPartState(it, DataPartState::Committed);
}
pos = (*it)->info.max_block + 1;
......@@ -1576,18 +1652,24 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
}
if (error)
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
{
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete."
<< " There might or might not be a data loss."
<< (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
}
}
}
size_t MergeTreeData::getTotalActiveSizeInBytes() const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
size_t res = 0;
for (auto & part : getDataPartsRange({DataPartState::Committed}))
res += part->size_in_bytes;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->size_in_bytes;
}
return res;
}
......@@ -1601,7 +1683,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const
size_t cur_count = 0;
const String * cur_partition_id = nullptr;
for (const auto & part : getDataPartsRange({DataPartState::Committed}))
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (cur_partition_id && part->info.partition_id == *cur_partition_id)
{
......@@ -1656,11 +1738,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
std::lock_guard<std::mutex> lock(data_parts_mutex);
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
/// The part can be covered only by the previous or the next one in data_parts.
auto committed_parts = getDataPartsRange({DataPartState::Committed});
auto it = committed_parts.convert(data_parts.lower_bound(part_info));
auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info));
if (it != committed_parts.end())
if (it != committed_parts_range.end())
{
if ((*it)->name == part_name)
return *it;
......@@ -1668,7 +1751,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
return *it;
}
if (it != committed_parts.begin())
if (it != committed_parts_range.begin())
{
--it;
if ((*it)->info.contains(part_info))
......@@ -1685,10 +1768,15 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
std::lock_guard<std::mutex> lock(data_parts_mutex);
auto filtered_parts = getDataPartsRange(valid_states);
auto it = filtered_parts.convert(data_parts.find(part_info));
if (it != filtered_parts.end() && (*it)->name == part_name)
return *it;
auto it = data_parts_by_name.find(part_info);
if (it == data_parts_by_name.end())
return nullptr;
for (auto state : valid_states)
{
if ((*it)->state == state)
return *it;
}
return nullptr;
}
......@@ -1733,7 +1821,8 @@ void MergeTreeData::calculateColumnSizesImpl()
column_sizes.clear();
/// Take into account only committed parts
for (const auto & part : getDataPartsRange({DataPartState::Committed}))
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
for (const auto & part : committed_parts_range)
addPartContributionToColumnSizes(part);
}
......@@ -1945,7 +2034,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
String partition_id = partition.getID(*this);
{
std::lock_guard<std::mutex> data_parts_lock(data_parts_mutex);
std::unique_lock<std::mutex> data_parts_lock(data_parts_mutex);
DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{
......@@ -1961,28 +2050,48 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
return partition_id;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states) const
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states) const
{
DataPartsVector res;
DataPartsVector buf;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
for (auto state : affordable_states)
{
buf = std::move(res);
res.clear();
auto range = getDataPartsStateRange(state);
std::merge(range.begin(), range.end(), buf.begin(), buf.end(), std::back_inserter(res), LessDataPart());
}
if (out_states != nullptr)
{
out_states->resize(res.size());
for (size_t i = 0; i < res.size(); ++i)
(*out_states)[i] = res[i]->state;
}
}
return res;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const MergeTreeData::DataPartStates & affordable_states,
MergeTreeData::DataPartStateVector & out_states_snapshot) const
MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeData::DataPartStateVector * out_states) const
{
DataPartsVector res;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
res.assign(data_parts_by_name.begin(), data_parts_by_name.end());
out_states_snapshot.resize(res.size());
for (size_t i = 0; i < res.size(); ++i)
out_states_snapshot[i] = res[i]->state;
if (out_states != nullptr)
{
out_states->resize(res.size());
for (size_t i = 0; i < res.size(); ++i)
(*out_states)[i] = res[i]->state;
}
}
return res;
}
......@@ -1991,7 +2100,11 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo
DataParts res;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::copy_if(data_parts.begin(), data_parts.end(), std::inserter(res, res.end()), DataPart::getStatesFilter(affordable_states));
for (auto state : affordable_states)
{
auto range = getDataPartsStateRange(state);
res.insert(range.begin(), range.end());
}
}
return res;
}
......@@ -2006,28 +2119,23 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
return getDataPartsVector({DataPartState::Committed});
}
MergeTreeData::DataParts MergeTreeData::getAllDataParts() const
{
return getDataParts({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
}
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock)
const String & partition_id, std::unique_lock<std::mutex> & data_parts_lock)
{
auto min_block = std::numeric_limits<Int64>::min();
MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0);
auto committed_parts = getDataPartsRange({DataPartState::Committed});
auto it = committed_parts.convert(data_parts.lower_bound(dummy_part_info));
auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, dummy_part_info));
if (it != committed_parts.end() && (*it)->info.partition_id == partition_id)
if (it != data_parts_by_state_and_name.end() && (*it)->state == DataPartState::Committed && (*it)->info.partition_id == partition_id)
return *it;
return {};
return nullptr;
}
void MergeTreeData::Transaction::rollback()
{
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
if (!isEmpty())
{
std::stringstream ss;
if (!parts_to_remove_on_rollback.empty())
......@@ -2049,14 +2157,19 @@ void MergeTreeData::Transaction::rollback()
/// PreCommitted -> Outdated
replaceParts(DataPartState::Outdated, DataPartState::Committed, true);
clear();
}
clear();
}
void MergeTreeData::Transaction::commit()
{
/// PreCommitted -> Committed, Committed -> Outdated
replaceParts(DataPartState::Committed, DataPartState::Outdated, false);
if (!isEmpty())
{
/// PreCommitted -> Committed, Committed -> Outdated
replaceParts(DataPartState::Committed, DataPartState::Outdated, false);
}
clear();
}
......@@ -2080,9 +2193,9 @@ void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_
/// If it is rollback then do nothing, else make it Outdated and remove their size contribution
if (move_committed_to != DataPartState::Committed)
{
for (auto & part : committed_parts)
for (const DataPartPtr & part : committed_parts)
{
part->state = move_committed_to;
data->modifyPartState(part, move_committed_to);
part->remove_time = remove_time;
data->removePartContributionToColumnSizes(part);
}
......@@ -2091,7 +2204,7 @@ void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_
/// If it is rollback just change state to Outdated, else change state to Committed and add their size contribution
for (auto & part : precommitted_parts)
{
part->state = move_precommitted_to;
data->modifyPartState(part, move_precommitted_to);
if (move_precommitted_to == DataPartState::Committed)
data->addPartContributionToColumnSizes(part);
else
......
......@@ -15,7 +15,10 @@
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <common/RangeFiltered.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
#include <boost/range/iterator_range_core.hpp>
namespace DB
{
......@@ -102,7 +105,16 @@ public:
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
struct DataPartPtrLess
/// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
struct DataPartStateAndInfo
{
DataPartState state;
const MergeTreePartInfo & info;
DataPartStateAndInfo(DataPartState state, const MergeTreePartInfo & info) : state(state), info(info) {}
};
struct LessDataPart
{
using is_transparent = void;
......@@ -111,7 +123,28 @@ public:
bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
};
using DataParts = std::set<DataPartPtr, DataPartPtrLess>;
struct LessStateDataPart
{
using is_transparent = void;
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
}
bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
{
return static_cast<size_t>(info.state) < static_cast<size_t>(state);
}
bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
{
return static_cast<size_t>(state) < static_cast<size_t>(info.state);
}
};
using DataParts = std::set<DataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
/// Some operations on the set of parts return a Transaction object.
......@@ -125,6 +158,11 @@ public:
void rollback();
bool isEmpty() const
{
return parts_to_add_on_rollback.empty() && parts_to_remove_on_rollback.empty();
}
~Transaction()
{
try
......@@ -304,22 +342,17 @@ public:
/// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts getDataParts(const DataPartStates & affordable_states) const;
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const;
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector & out_states_snapshot) const;
/// Returns sorted list of the parts with specified states
/// out_states will contain snapshot of each part state
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Returns a virtual container iteration only through parts with specified states
decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const
{
return createRangeFiltered(DataPart::getStatesFilter(affordable_states), data_parts);
}
/// Returns absolutely all parts (and snapshot of their states)
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
/// Returns Committed parts
DataParts getDataParts() const;
DataPartsVector getDataPartsVector() const;
/// Returns all parts except Temporary and Deleting ones
DataParts getAllDataParts() const;
/// Returns an comitted part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name);
......@@ -367,8 +400,8 @@ public:
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// Delete irrelevant parts.
void clearOldParts();
/// Delete irrelevant parts from memory and disk.
void clearOldPartsFromFilesystem();
/// Deleate all directories which names begin with "tmp"
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
......@@ -530,10 +563,81 @@ private:
String log_name;
Logger * log;
/// Work with data parts
struct TagByName{};
struct TagByStateAndName{};
static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
{
return part->info;
}
static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part)
{
return {part->state, part->info};
};
using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
boost::multi_index::indexed_by<
/// Index by Name
boost::multi_index::ordered_unique<
boost::multi_index::tag<TagByName>,
boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
>,
/// Index by (State, Name), is used to obtain ordered slices of parts with the same state
boost::multi_index::ordered_unique<
boost::multi_index::tag<TagByStateAndName>,
boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
LessStateDataPart
>
>
>;
/// Current set of data parts.
DataParts data_parts;
mutable std::mutex data_parts_mutex;
// TODO: this mutex could be a bottleneck. If so, make it shared, and split parts onto partitions
DataPartsIndexes data_parts_indexes;
DataPartsIndexes::index<TagByName>::type & data_parts_by_name;
DataPartsIndexes::index<TagByStateAndName>::type & data_parts_by_state_and_name;
using DataPartIteratorByAndName = DataPartsIndexes::index<TagByName>::type::iterator;
using DataPartIteratorByStateAndName = DataPartsIndexes::index<TagByStateAndName>::type::iterator;
boost::iterator_range<DataPartIteratorByStateAndName> getDataPartsStateRange(DataPartState state) const
{
auto begin = data_parts_by_state_and_name.lower_bound(state, LessStateDataPart());
auto end = data_parts_by_state_and_name.upper_bound(state, LessStateDataPart());
return {begin, end};
}
static decltype(auto) getStateModifier(DataPartState state)
{
return [state] (const DataPartPtr & part) { part->state = state; };
}
void modifyPartState(DataPartIteratorByStateAndName it, DataPartState state)
{
if (!data_parts_by_state_and_name.modify(it, getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
void modifyPartState(DataPartIteratorByAndName it, DataPartState state)
{
if (!data_parts_by_state_and_name.modify(data_parts_indexes.project<TagByStateAndName>(it), getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
void modifyPartState(const DataPartPtr & part, DataPartState state)
{
auto it = data_parts_by_name.find(part->info);
if (it == data_parts_by_name.end() || (*it).get() != part.get())
throw Exception("Part " + part->name + " is not exists", ErrorCodes::LOGICAL_ERROR);
if (!data_parts_by_state_and_name.modify(data_parts_indexes.project<TagByStateAndName>(it), getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
}
/// Used to serialize calls to grabOldParts.
std::mutex grab_old_parts_mutex;
......@@ -566,7 +670,7 @@ private:
void removePartContributionToColumnSizes(const DataPartPtr & part);
/// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock);
DataPartPtr getAnyPartInPartition(const String & partition_id, std::unique_lock<std::mutex> & data_parts_lock);
};
}
......@@ -185,7 +185,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(
if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id
&& part->info.min_block < (*prev_part)->info.max_block)
{
LOG_ERROR(log, "Part " << part->name << " intersects previous part " << (*prev_part)->name);
LOG_ERROR(log, "Part " << part->getNameWithState() << " intersects previous part " << (*prev_part)->getNameWithState());
}
prev_part = &part;
......
......@@ -38,6 +38,7 @@ namespace ErrorCodes
extern const int FORMAT_VERSION_TOO_OLD;
extern const int UNKNOWN_FORMAT;
extern const int UNEXPECTED_FILE_IN_DATA_PART;
extern const int NOT_FOUND_EXPECTED_DATA_PART;
}
......@@ -934,4 +935,16 @@ String MergeTreeDataPart::stateString() const
return stateToString(state);
}
void MergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPart::State> & affordable_states) const
{
if (!checkState(affordable_states))
{
String states_str;
for (auto state : affordable_states)
states_str += stateToString(state) + " ";
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
}
}
}
......@@ -187,17 +187,7 @@ struct MergeTreeDataPart
}
/// Throws an exception if state of the part is not in affordable_states
void assertState(const std::initializer_list<State> & affordable_states) const
{
if (!checkState(affordable_states))
{
String states_str;
for (auto state : affordable_states)
states_str += stateToString(state) + " ";
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str);
}
}
void assertState(const std::initializer_list<State> & affordable_states) const;
/// In comparison with lambdas, it is move assignable and could has several overloaded operator()
struct StatesFilter
......@@ -324,4 +314,7 @@ private:
void checkConsistency(bool require_part_metadata);
};
using MergeTreeDataPartState = MergeTreeDataPart::State;
}
......@@ -68,7 +68,7 @@ StorageMergeTree::StorageMergeTree(
}
else
{
data.clearOldParts();
data.clearOldPartsFromFilesystem();
}
/// Temporary directories contain incomplete results of merges (after forced restart)
......@@ -188,7 +188,7 @@ void StorageMergeTree::alter(
if (primary_key_is_modified && supportsSampling())
throw Exception("MODIFY PRIMARY KEY only supported for tables without sampling key", ErrorCodes::BAD_ARGUMENTS);
MergeTreeData::DataParts parts = data.getAllDataParts();
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
for (const MergeTreeData::DataPartPtr & part : parts)
{
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_primary_key_ast, false))
......@@ -291,7 +291,7 @@ bool StorageMergeTree::merge(
/// Clear old parts. It does not matter to do it more frequently than each second.
if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1))
{
data.clearOldParts();
data.clearOldPartsFromFilesystem();
data.clearOldTemporaryDirectories();
}
......
......@@ -93,6 +93,7 @@ namespace ErrorCodes
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
extern const int TOO_MUCH_FETCHES;
extern const int BAD_DATA_PART_NAME;
extern const int PART_IS_TEMPORARILY_LOCKED;
}
......@@ -766,7 +767,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
/// Parts in ZK.
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
MergeTreeData::DataParts parts = data.getAllDataParts();
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
/// Local parts that are not in ZK.
MergeTreeData::DataParts unexpected_parts;
......@@ -1145,7 +1146,21 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
if (!do_fetch)
{
merger.renameMergedTemporaryPart(part, parts, &transaction);
getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired.
/// Do not commit if the part is obsolete
if (!transaction.isEmpty())
{
getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired.
transaction.commit();
}
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/
merge_selecting_handle->schedule();
if (auto part_log = context.getPartLog(database_name, table_name))
{
......@@ -1178,15 +1193,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
}
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/
transaction.commit();
merge_selecting_handle->schedule();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
}
......@@ -1409,8 +1415,9 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
/// It's important that no old parts remain (after the merge), because otherwise,
/// after adding a new replica, this new replica downloads them, but does not delete them.
/// And, if you do not, the parts will come to life after the server is restarted.
/// Therefore, we use getAllDataParts.
auto parts = data.getAllDataParts();
/// Therefore, we use all data parts.
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
for (const auto & part : parts)
{
if (!entry_part_info.contains(part->info))
......@@ -1577,6 +1584,11 @@ bool StorageReplicatedMergeTree::queueTask()
/// Interrupted merge or downloading a part is not an error.
LOG_INFO(log, e.message());
}
else if (e.code() == ErrorCodes::PART_IS_TEMPORARILY_LOCKED)
{
/// Part cannot be added temporarily
LOG_INFO(log, e.displayText());
}
else
tryLogCurrentException(__PRETTY_FUNCTION__);
......@@ -2161,6 +2173,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
/// Do not commit if the part is obsolete
if (!transaction.isEmpty())
{
getZooKeeper()->multi(ops);
transaction.commit();
}
if (auto part_log = context.getPartLog(database_name, table_name))
{
PartLogElement elem;
......@@ -2192,10 +2211,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
}
}
getZooKeeper()->multi(ops);
transaction.commit();
/** If a quorum is tracked for this part, you must update it.
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
*/
......
......@@ -39,7 +39,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name_)
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()}
}
{
}
......@@ -53,9 +53,12 @@ BlockInputStreams StorageSystemParts::read(
const size_t max_block_size,
const unsigned num_streams)
{
check(column_names);
//check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
auto it_state_column = std::find(column_names.begin(), column_names.end(), "_state");
bool has_state_column = it_state_column != column_names.end();
/// Will apply WHERE to subset of columns and then add more columns.
/// This is kind of complicated, but we use WHERE to do less work.
......@@ -142,6 +145,8 @@ BlockInputStreams StorageSystemParts::read(
/// Finally, create the result.
Block block = getSampleBlock();
if (has_state_column)
block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
for (size_t i = 0; i < filtered_database_column->size();)
{
......@@ -198,10 +203,18 @@ BlockInputStreams StorageSystemParts::read(
using State = MergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
if (need[0])
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, all_parts_state);
{
/// If has_state_column is requested, return all states
if (!has_state_column)
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
else
all_parts = data->getAllDataPartsVector(&all_parts_state);
}
else
all_parts = data->getDataPartsVector({State::Committed}, all_parts_state);
all_parts = data->getDataPartsVector({State::Committed}, &all_parts_state);
/// Finally, we'll go through the list of parts.
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
......@@ -248,11 +261,30 @@ BlockInputStreams StorageSystemParts::read(
block.getByPosition(i++).column->insert(database);
block.getByPosition(i++).column->insert(table);
block.getByPosition(i++).column->insert(engine);
if (has_state_column)
block.getByPosition(i++).column->insert(part->stateString());
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block));
}
NameAndTypePair StorageSystemParts::getColumn(const String & column_name) const
{
if (column_name == "_state")
return NameAndTypePair("_state", std::make_shared<DataTypeString>());
return ITableDeclaration::getColumn(column_name);
}
bool StorageSystemParts::hasColumn(const String & column_name) const
{
if (column_name == "_state")
return true;
return ITableDeclaration::hasColumn(column_name);
}
}
......@@ -20,6 +20,10 @@ public:
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
......
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#include <gtest/gtest.h>
#pragma GCC diagnostic pop
#include <common/RangeFiltered.h>
#include <vector>
#include <set>
TEST(RangeFiltered, simple)
{
std::vector<int> v;
for (int i = 0; i < 10; ++i)
v.push_back(i);
auto v30 = createRangeFiltered([] (int i) { return i % 3 == 0; }, v);
auto v31 = createRangeFiltered([] (int i) { return i % 3 != 0; }, v);
for (const int & i : v30)
ASSERT_EQ(i % 3, 0);
for (const int & i : v31)
ASSERT_NE(i % 3, 0);
{
auto it = v30.begin();
ASSERT_EQ(*it, 0);
auto it2 = std::next(it);
ASSERT_EQ(*it2, 3);
it = std::next(it2);
ASSERT_EQ(*it, 6);
}
{
auto it = std::next(v30.begin());
ASSERT_EQ(*it, 3);
*it = 2; /// it becomes invalid
ASSERT_EQ(*(++it), 6); /// but iteration is sucessfull
*v30.begin() = 1;
ASSERT_EQ(*v30.begin(), 6);
}
}
......@@ -26,6 +26,7 @@ def started_cluster():
pass
cluster.shutdown()
def test_random_inserts(started_cluster):
# Duration of the test, reduce it if don't want to wait
DURATION_SECONDS = 10# * 60
......@@ -55,7 +56,9 @@ def test_random_inserts(started_cluster):
inserter.get_answer()
answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp)
for node in nodes:
assert TSV(node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
res = node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")
assert TSV(res) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
......@@ -4,6 +4,7 @@
[[ -n "$1" ]] && host="$1" || host="localhost"
[[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 10 ))
[[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 10 ))
[[ -n "$4" ]] && iters_per_timestamp="$4" || iters_per_timestamp=1
timestamps=`seq $min_timestamp $max_timestamp`
......@@ -40,6 +41,6 @@ for i in $timestamps; do
cur_timestamp=$(date +%s)
done
#echo $i >> $host".txt"
reliable_insert "$i"
done
\ No newline at end of file
done
sleep 1
#pragma once
#include <type_traits>
/// Similar to boost::filtered_range but a little bit easier and allows to convert ordinary iterators to filtered
template <typename F, typename C>
struct RangeFiltered
{
/// Template parameter C may be const. Then const_iterator is used.
using RawIterator = decltype(std::declval<C>().begin());
class Iterator;
/// Will iterate over elements for which filter(*it) == true
template <typename F_, typename C_> /// Another template for universal references to work.
RangeFiltered(F_ && filter, C_ && container)
: filter(std::move(filter)), container(container) {}
Iterator begin() const
{
return Iterator{*this, std::begin(container)};
}
Iterator end() const
{
return Iterator{*this, std::end(container)};
}
/// Convert ordinary iterator to filtered one
/// Real position will be in range [ordinary_iterator; end()], so it is suitable to use with lower[upper]_bound()
inline Iterator convert(RawIterator ordinary_iterator) const
{
return Iterator{*this, ordinary_iterator};
}
/// It is similar to boost::filtered_iterator, but has additional features:
/// it doesn't store end() iterator
/// it doesn't store predicate, so it allows to implement operator=()
/// it guarantees that operator++() works properly in case of filter(*it) == false
class Iterator
{
public:
using Range = RangeFiltered<F, C>;
typedef Iterator self_type;
typedef typename std::iterator_traits<RawIterator>::value_type value_type;
typedef typename std::iterator_traits<RawIterator>::reference reference;
typedef const value_type & const_reference;
typedef typename std::iterator_traits<RawIterator>::pointer pointer;
typedef const value_type * const_pointer;
typedef typename std::iterator_traits<RawIterator>::difference_type difference_type;
typedef std::bidirectional_iterator_tag iterator_category;
Iterator(const Range & range_, RawIterator iter_)
: range(&range_), iter(iter_)
{
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
}
Iterator(const Iterator & rhs) = default;
Iterator(Iterator && rhs) = default;
Iterator operator++()
{
++iter;
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
return *this;
}
Iterator operator--()
{
--iter;
for (; !range->filter(*iter); --iter); /// Don't check std::begin() bound
return *this;
}
pointer operator->()
{
return iter.operator->();
}
const_pointer operator->() const
{
return iter.operator->();
}
reference operator*()
{
return *iter;
}
const_reference operator*() const
{
return *iter;
}
bool operator==(const self_type & rhs) const
{
return iter == rhs.iter;
}
bool operator!=(const self_type & rhs) const
{
return iter != rhs.iter;
}
self_type & operator=(const self_type & rhs) = default;
self_type & operator=(self_type && rhs) = default;
~Iterator() = default;
private:
const Range * range = nullptr;
RawIterator iter;
};
protected:
F filter;
C & container;
};
template <typename F, typename C>
inline RangeFiltered<std::decay_t<F>, std::remove_reference_t<C>> createRangeFiltered(F && filter, C && container)
{
return {std::forward<F>(filter), std::forward<C>(container)};
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册