提交 1e29795c 编写于 作者: V Vitaliy Lyudvichenko

Fix race condition between cleanup and fetch thread. [#CLICKHOUSE-2989]

It was happening during ALTERs.
TODO: stop fetches during ALTER.
上级 d9637c29
......@@ -73,7 +73,8 @@ MergeTreeData::MergeTreeData(
const String & log_name_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_)
BrokenPartCallback broken_part_callback_,
PartsCleanCallback parts_clean_callback_)
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
......@@ -83,6 +84,7 @@ MergeTreeData::MergeTreeData(
database_name(database_), table_name(table_),
full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_),
parts_clean_callback(parts_clean_callback_ ? parts_clean_callback_ : [this](){ clearOldParts(); }),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
{
/// Check that the date column exists and is of type Date.
......@@ -1203,6 +1205,15 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
bool in_all_data_parts;
{
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
in_all_data_parts = all_data_parts.count(part) != 0;
}
/// New part can be removed from data_parts but not from file system and ZooKeeper
if (in_all_data_parts)
clearOldPartsAndRemoveFromZK();
/// Rename the part.
part->renameTo(new_name);
part->is_temp = false;
......@@ -1305,7 +1316,7 @@ void MergeTreeData::attachPart(const DataPartPtr & part)
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
{
LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");
LOG_INFO(log, "Renaming " << part->relative_path << " to " << prefix << part->name << " and detaching it.");
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
......@@ -1764,4 +1775,34 @@ DayNum_t MergeTreeData::getMonthFromPartPrefix(const String & part_prefix)
return getMonthFromName(part_prefix.substr(0, strlen("YYYYMM")));
}
void MergeTreeData::Transaction::rollback()
{
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
{
std::stringstream ss;
if (!parts_to_remove_on_rollback.empty())
{
ss << " Removing parts:";
for (const auto & part : parts_to_remove_on_rollback)
ss << " " << part->relative_path;
ss << ".";
}
if (!parts_to_add_on_rollback.empty())
{
ss << " Adding parts: ";
for (const auto & part : parts_to_add_on_rollback)
ss << " " << part->relative_path;
ss << ".";
}
LOG_DEBUG(data->log, "Undoing transaction." << ss.str());
data->replaceParts(parts_to_remove_on_rollback, parts_to_add_on_rollback, true);
clear();
}
}
}
......@@ -85,6 +85,8 @@ class MergeTreeData : public ITableDeclaration
public:
/// Function to call if the part is suspected to contain corrupt data.
using BrokenPartCallback = std::function<void (const String &)>;
/// Callback to delete outdated parts immediately
using PartsCleanCallback = std::function<void ()>;
using DataPart = MergeTreeDataPart;
using MutableDataPartPtr = std::shared_ptr<DataPart>;
......@@ -110,21 +112,7 @@ public:
clear();
}
void rollback()
{
if (data && (!parts_to_remove_on_rollback.empty() || !parts_to_add_on_rollback.empty()))
{
String parts_to_remove_str;
for (const auto & part : parts_to_remove_on_rollback)
parts_to_remove_str += part->relative_path + ", ";
LOG_DEBUG(data->log, "Undoing transaction. Removing parts: " + parts_to_remove_str);
data->replaceParts(parts_to_remove_on_rollback, parts_to_add_on_rollback, true);
clear();
}
}
void rollback();
~Transaction()
{
......@@ -198,12 +186,12 @@ public:
/// Merging mode. See above.
enum Mode
{
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
Unsorted = 4,
Replacing = 5,
Graphite = 6,
};
......@@ -235,7 +223,7 @@ public:
/// index_granularity - how many rows correspond to one primary key value.
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData( const String & database_, const String & table_,
MergeTreeData( const String & database_, const String & table_,
const String & full_path_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
......@@ -250,7 +238,9 @@ public:
const String & log_name_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
BrokenPartCallback broken_part_callback_ = [](const String &){},
PartsCleanCallback parts_clean_callback_ = nullptr
);
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
......@@ -401,6 +391,12 @@ public:
broken_part_callback(name);
}
/// Delete old parts from disk and ZooKeeper (in replicated case)
void clearOldPartsAndRemoveFromZK()
{
parts_clean_callback();
}
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
......@@ -506,7 +502,10 @@ private:
/// Current column sizes in compressed and uncompressed form.
ColumnSizes column_sizes;
/// Engine-specific methods
BrokenPartCallback broken_part_callback;
/// Use to delete outdated parts immediately from memory, disk and ZooKeeper
PartsCleanCallback parts_clean_callback;
String log_name;
Logger * log;
......
......@@ -487,7 +487,7 @@ void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_n
Poco::File(from).list(files);
LOG_WARNING(storage.log, "Part directory " << to << " already exists"
<< " and contains " + toString(files.size()) + " files. Remove it.");
<< " and contains " << files.size() << " files. Removing it.");
to_file.remove(true);
}
......
......@@ -44,7 +44,7 @@ void ReplicatedMergeTreeCleanupThread::run()
void ReplicatedMergeTreeCleanupThread::iterate()
{
clearOldParts();
storage.clearOldPartsAndRemoveFromZK(log);
storage.data.clearOldTemporaryDirectories();
if (storage.unreplicated_data)
......@@ -61,46 +61,6 @@ void ReplicatedMergeTreeCleanupThread::iterate()
}
void ReplicatedMergeTreeCleanupThread::clearOldParts()
{
auto table_lock = storage.lockStructure(false);
auto zookeeper = storage.getZooKeeper();
MergeTreeData::DataPartsVector parts = storage.data.grabOldParts();
size_t count = parts.size();
if (!count)
return;
try
{
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
LOG_DEBUG(log, "Removing " << part->name);
zkutil::Ops ops;
storage.removePartFromZooKeeper(part->name, ops);
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
part->remove();
parts.pop_back();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
storage.data.addOldParts(parts);
throw;
}
LOG_DEBUG(log, "Removed " << count << " old parts");
}
void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
auto zookeeper = storage.getZooKeeper();
......
......@@ -31,9 +31,6 @@ private:
void run();
void iterate();
/// Delete old chunks from disk and from ZooKeeper.
void clearOldParts();
/// Remove old records from ZooKeeper.
void clearOldLogs();
......
......@@ -225,7 +225,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
}
catch (...)
{
storage.replica_is_active_node = nullptr;
storage.replica_is_active_node = nullptr;
storage.leader_election = nullptr;
try
......
......@@ -221,7 +221,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }
),
reader(data), writer(data, context), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this),
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
......@@ -3960,4 +3962,49 @@ bool StorageReplicatedMergeTree::checkSpaceForResharding(const ReplicaToSpaceInf
return true;
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
Logger * log = log_ ? log_ : this->log;
auto table_lock = lockStructure(false);
auto zookeeper = getZooKeeper();
MergeTreeData::DataPartsVector parts = data.grabOldParts();
size_t count = parts.size();
if (!count)
return;
try
{
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
LOG_DEBUG(log, "Removing " << part->name);
zkutil::Ops ops;
removePartFromZooKeeper(part->name, ops);
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
part->remove();
parts.pop_back();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
data.addOldParts(parts);
throw;
}
LOG_DEBUG(log, "Removed " << count << " old parts");
}
}
......@@ -206,6 +206,9 @@ public:
private:
void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings);
/// Delete old chunks from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK(Logger * log_ = nullptr);
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreePartCheckThread;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册