提交 61233b88 编写于 作者: V Vitaliy Lyudvichenko

Better system.part_log. [#CLICKHOUSE-3342]

上级 9965f5e3
...@@ -365,6 +365,7 @@ namespace ErrorCodes ...@@ -365,6 +365,7 @@ namespace ErrorCodes
extern const int NO_COMMON_TYPE = 386; extern const int NO_COMMON_TYPE = 386;
extern const int EXTERNAL_LOADABLE_ALREADY_EXISTS = 387; extern const int EXTERNAL_LOADABLE_ALREADY_EXISTS = 387;
extern const int CANNOT_ASSIGN_OPTIMIZE = 388; extern const int CANNOT_ASSIGN_OPTIMIZE = 388;
extern const int INSERT_WAS_DEDUPLICATED = 389;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;
......
...@@ -58,7 +58,7 @@ private: ...@@ -58,7 +58,7 @@ private:
void fillMaps(); void fillMaps();
public: public:
DataTypeEnum(const Values & values_); explicit DataTypeEnum(const Values & values_);
DataTypeEnum(const DataTypeEnum & other); DataTypeEnum(const DataTypeEnum & other);
const Values & getValues() const { return values; } const Values & getValues() const { return values; }
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <Storages/MergeTree/MergeTreeDataPart.h> #include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
...@@ -16,21 +17,38 @@ namespace DB ...@@ -16,21 +17,38 @@ namespace DB
Block PartLogElement::createBlock() Block PartLogElement::createBlock()
{ {
auto event_type_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values{
{"NEW_PART", static_cast<Int8>(NEW_PART)},
{"MERGE_PARTS", static_cast<Int8>(MERGE_PARTS)},
{"DOWNLOAD_PART", static_cast<Int8>(DOWNLOAD_PART)},
{"REMOVE_PART", static_cast<Int8>(REMOVE_PART)}
}
);
return return
{ {
{ColumnUInt8::create(), std::make_shared<DataTypeUInt8>(), "event_type"}, {ColumnInt8::create(), std::move(event_type_datatype), "event_type"},
{ColumnUInt16::create(), std::make_shared<DataTypeDate>(), "event_date"}, {ColumnUInt16::create(), std::make_shared<DataTypeDate>(), "event_date"},
{ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(), "event_time"}, {ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(), "event_time"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "size_in_bytes"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "duration_ms"}, {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "duration_ms"},
{ColumnString::create(), std::make_shared<DataTypeString>(), "database"}, {ColumnString::create(), std::make_shared<DataTypeString>(), "database"},
{ColumnString::create(), std::make_shared<DataTypeString>(), "table"}, {ColumnString::create(), std::make_shared<DataTypeString>(), "table"},
{ColumnString::create(), std::make_shared<DataTypeString>(), "part_name"}, {ColumnString::create(), std::make_shared<DataTypeString>(), "part_name"},
{ColumnArray::create(ColumnString::create()),
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "merged_from"}, {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "rows"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "size_in_bytes"}, // On disk
/// Merge-specific info
{ColumnArray::create(ColumnString::create()), std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "merged_from"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "bytes_uncompressed"}, // Result bytes
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_rows"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_bytes"},
/// Is there an error during the execution or commit
{ColumnUInt16::create(), std::make_shared<DataTypeUInt16>(), "error"},
{ColumnString::create(), std::make_shared<DataTypeString>(), "exception"},
}; };
} }
...@@ -40,41 +58,71 @@ void PartLogElement::appendToBlock(Block & block) const ...@@ -40,41 +58,71 @@ void PartLogElement::appendToBlock(Block & block) const
size_t i = 0; size_t i = 0;
columns[i++]->insert(UInt64(event_type)); columns[i++]->insert(Int64(event_type));
columns[i++]->insert(UInt64(DateLUT::instance().toDayNum(event_time))); columns[i++]->insert(UInt64(DateLUT::instance().toDayNum(event_time)));
columns[i++]->insert(UInt64(event_time)); columns[i++]->insert(UInt64(event_time));
columns[i++]->insert(UInt64(size_in_bytes));
columns[i++]->insert(UInt64(duration_ms)); columns[i++]->insert(UInt64(duration_ms));
columns[i++]->insert(database_name); columns[i++]->insert(database_name);
columns[i++]->insert(table_name); columns[i++]->insert(table_name);
columns[i++]->insert(part_name); columns[i++]->insert(part_name);
Array merged_from_array; columns[i++]->insert(UInt64(rows));
merged_from_array.reserve(merged_from.size()); columns[i++]->insert(UInt64(bytes_compressed_on_disk));
for (const auto & name : merged_from)
merged_from_array.push_back(name); Array source_part_names_array;
source_part_names_array.reserve(source_part_names.size());
for (const auto & name : source_part_names)
source_part_names_array.push_back(name);
columns[i++]->insert(source_part_names_array);
columns[i++]->insert(UInt64(bytes_uncompressed));
columns[i++]->insert(UInt64(rows_read));
columns[i++]->insert(UInt64(bytes_read_uncompressed));
columns[i++]->insert(merged_from_array); columns[i++]->insert(UInt64(error));
columns[i++]->insert(exception);
block.setColumns(std::move(columns)); block.setColumns(std::move(columns));
} }
void PartLog::addNewPart(const MergeTreeDataPart & part, double elapsed)
bool PartLog::addNewPartToTheLog(Context & context, const MergeTreeDataPart & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status)
{ {
PartLog * part_log = nullptr;
try
{
part_log = context.getPartLog(part.storage.getDatabaseName(), part.storage.getTableName());
if (!part_log)
return false;
PartLogElement elem; PartLogElement elem;
elem.event_time = time(nullptr);
elem.event_type = PartLogElement::NEW_PART; elem.event_type = PartLogElement::NEW_PART;
elem.size_in_bytes = part.size_in_bytes; elem.event_time = time(nullptr);
elem.duration_ms = elapsed / 1000000; elem.duration_ms = elapsed_ns / 1000000;
elem.database_name = part.storage.getDatabaseName(); elem.database_name = part.storage.getDatabaseName();
elem.table_name = part.storage.getTableName(); elem.table_name = part.storage.getTableName();
elem.part_name = part.name; elem.part_name = part.name;
add(elem); elem.bytes_compressed_on_disk = part.size_in_bytes;
elem.rows = part.rows_count;
elem.error = static_cast<UInt16>(execution_status.code);
elem.exception = execution_status.message;
part_log->add(elem);
}
catch (...)
{
tryLogCurrentException(part_log ? part_log->log : &Logger::get("PartLog"), __PRETTY_FUNCTION__);
return false;
}
return true;
} }
} }
...@@ -18,15 +18,28 @@ struct PartLogElement ...@@ -18,15 +18,28 @@ struct PartLogElement
Type event_type = NEW_PART; Type event_type = NEW_PART;
time_t event_time{}; time_t event_time = 0;
UInt64 duration_ms = 0;
UInt64 size_in_bytes{};
UInt64 duration_ms{};
String database_name; String database_name;
String table_name; String table_name;
String part_name; String part_name;
Strings merged_from;
/// Size of the part
UInt64 rows = 0;
/// Size of files in filesystem
UInt64 bytes_compressed_on_disk = 0;
//// Make sense for Merges
Strings source_part_names;
UInt64 bytes_uncompressed = 0;
UInt64 rows_read = 0;
UInt64 bytes_read_uncompressed = 0;
/// Is the operation was successful?
UInt16 error = 0;
String exception;
static std::string name() { return "PartLog"; } static std::string name() { return "PartLog"; }
...@@ -44,7 +57,8 @@ class PartLog : public SystemLog<PartLogElement> ...@@ -44,7 +57,8 @@ class PartLog : public SystemLog<PartLogElement>
public: public:
/// Add a record about creation of new part. /// Add a record about creation of new part.
void addNewPart(const MergeTreeDataPart & part, double elapsed); static bool addNewPartToTheLog(Context & context, const MergeTreeDataPart & part, UInt64 elapsed_ns,
const ExecutionStatus & execution_status = {});
}; };
} }
...@@ -101,7 +101,7 @@ public: ...@@ -101,7 +101,7 @@ public:
LOG_ERROR(log, "SystemLog queue is full"); LOG_ERROR(log, "SystemLog queue is full");
} }
private: protected:
Context & context; Context & context;
const String database_name; const String database_name;
const String table_name; const String table_name;
......
...@@ -18,8 +18,7 @@ void MergeTreeBlockOutputStream::write(const Block & block) ...@@ -18,8 +18,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block); MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
storage.data.renameTempPartAndAdd(part, &storage.increment); storage.data.renameTempPartAndAdd(part, &storage.increment);
if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName())) PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed());
part_log->addNewPart(*part, watch.elapsed());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.merge_task_handle->wake(); storage.merge_task_handle->wake();
......
...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
#include <typeinfo> #include <typeinfo>
#include <typeindex> #include <typeindex>
#include <optional> #include <optional>
#include <Interpreters/PartLog.h>
namespace ProfileEvents namespace ProfileEvents
...@@ -656,6 +657,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & ...@@ -656,6 +657,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts) void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts)
{ {
{
std::lock_guard<std::mutex> lock(data_parts_mutex); std::lock_guard<std::mutex> lock(data_parts_mutex);
/// TODO: use data_parts iterators instead of pointers /// TODO: use data_parts iterators instead of pointers
...@@ -669,6 +671,30 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa ...@@ -669,6 +671,30 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
data_parts_indexes.erase(it); data_parts_indexes.erase(it);
} }
}
/// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging
/// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away
if (auto part_log = context.getPartLog(database_name, table_name))
{
PartLogElement part_log_elem;
part_log_elem.event_type = PartLogElement::REMOVE_PART;
part_log_elem.event_time = time(nullptr);
part_log_elem.duration_ms = 0;
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
for (auto & part : parts)
{
part_log_elem.part_name = part->name;
part_log_elem.bytes_compressed_on_disk = part->size_in_bytes;
part_log_elem.rows = part->rows_count;
part_log->add(part_log_elem);
}
}
} }
void MergeTreeData::clearOldPartsFromFilesystem() void MergeTreeData::clearOldPartsFromFilesystem()
......
...@@ -130,7 +130,7 @@ struct MergeTreeDataPart ...@@ -130,7 +130,7 @@ struct MergeTreeDataPart
String name; String name;
MergeTreePartInfo info; MergeTreePartInfo info;
/// A directory path (realative to storage's path) where part data is actually stored /// A directory path (relative to storage's path) where part data is actually stored
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>' /// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
mutable String relative_path; mutable String relative_path;
......
...@@ -20,6 +20,7 @@ namespace ErrorCodes ...@@ -20,6 +20,7 @@ namespace ErrorCodes
extern const int NO_ZOOKEEPER; extern const int NO_ZOOKEEPER;
extern const int READONLY; extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT; extern const int UNKNOWN_STATUS_OF_INSERT;
extern const int INSERT_WAS_DEDUPLICATED;
} }
...@@ -141,10 +142,21 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) ...@@ -141,10 +142,21 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows"); LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows");
} }
try
{
commitPart(zookeeper, part, block_id); commitPart(zookeeper, part, block_id);
if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName())) /// Set a special error code if the block is duplicate
part_log->addNewPart(*part, watch.elapsed()); int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed(), ExecutionStatus(error));
}
catch (...)
{
PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
throw;
}
} }
} }
...@@ -163,10 +175,16 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta ...@@ -163,10 +175,16 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta
Stopwatch watch; Stopwatch watch;
try
{
commitPart(zookeeper, part, ""); commitPart(zookeeper, part, "");
PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed());
if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName())) }
part_log->addNewPart(*part, watch.elapsed()); catch (...)
{
PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
throw;
}
} }
......
...@@ -330,45 +330,66 @@ bool StorageMergeTree::merge( ...@@ -330,45 +330,66 @@ bool StorageMergeTree::merge(
merging_tagger.emplace(future_part.parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(future_part.parts), *this); merging_tagger.emplace(future_part.parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(future_part.parts), *this);
} }
MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, future_part.name, future_part.parts); MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, future_part.name, future_part.parts);
/// Logging /// Logging
Stopwatch stopwatch; Stopwatch stopwatch;
MergeTreeData::MutableDataPartPtr new_part;
auto new_part = merger.mergePartsToTemporaryPart( auto write_part_log = [&] (const ExecutionStatus & execution_status)
future_part, *merge_entry_ptr, aio_threshold, time(nullptr), merging_tagger->reserved_space.get(), deduplicate); {
try
{
auto part_log = context.getPartLog(database_name, table_name);
if (!part_log)
return;
merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); PartLogElement part_log_elem;
if (auto part_log = context.getPartLog(database_name, table_name)) part_log_elem.event_type = PartLogElement::MERGE_PARTS;
{ part_log_elem.event_time = time(nullptr);
PartLogElement elem; part_log_elem.duration_ms = stopwatch.elapsed() / 1000000;
elem.event_time = time(nullptr);
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
part_log_elem.part_name = future_part.name;
elem.merged_from.reserve(future_part.parts.size()); if (new_part)
for (const auto & part : future_part.parts) part_log_elem.bytes_compressed_on_disk = new_part->size_in_bytes;
elem.merged_from.push_back(part->name);
elem.event_type = PartLogElement::MERGE_PARTS;
elem.size_in_bytes = new_part->size_in_bytes;
elem.database_name = new_part->storage.getDatabaseName(); part_log_elem.source_part_names.reserve(future_part.parts.size());
elem.table_name = new_part->storage.getTableName(); for (const auto & source_part : future_part.parts)
elem.part_name = new_part->name; part_log_elem.source_part_names.push_back(source_part->name);
elem.duration_ms = stopwatch.elapsed() / 1000000; part_log_elem.rows_read = (*merge_entry)->bytes_read_uncompressed;
part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;
part_log->add(elem); part_log_elem.rows = (*merge_entry)->rows_written;
part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
elem.duration_ms = 0; part_log_elem.error = static_cast<UInt16>(execution_status.code);
elem.event_type = PartLogElement::REMOVE_PART; part_log_elem.exception = execution_status.message;
elem.merged_from = Strings();
for (const auto & part : future_part.parts) part_log->add(part_log_elem);
}
catch (...)
{ {
elem.part_name = part->name; tryLogCurrentException(log, __PRETTY_FUNCTION__);
elem.size_in_bytes = part->size_in_bytes;
part_log->add(elem);
} }
};
try
{
new_part = merger.mergePartsToTemporaryPart(future_part, *merge_entry, aio_threshold, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate);
merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
write_part_log({});
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
throw;
} }
return true; return true;
......
...@@ -980,12 +980,33 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -980,12 +980,33 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
} }
bool do_fetch = false; bool do_fetch = false;
if (entry.type == LogEntry::GET_PART) if (entry.type == LogEntry::GET_PART)
{ {
do_fetch = true; do_fetch = true;
} }
else if (entry.type == LogEntry::MERGE_PARTS) else if (entry.type == LogEntry::MERGE_PARTS)
{
tryExecuteMerge(entry, do_fetch);
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
}
if (do_fetch)
return executeFetch(entry);
return true;
}
void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry, bool & do_fetch)
{
/// The caller has already decided to make the fetch
if (do_fetch)
return;
// Log source part names just in case
{ {
std::stringstream log_message; std::stringstream log_message;
log_message << "Executing log entry to merge parts "; log_message << "Executing log entry to merge parts ";
...@@ -994,6 +1015,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -994,6 +1015,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
log_message << " to " << entry.new_part_name; log_message << " to " << entry.new_part_name;
LOG_TRACE(log, log_message.rdbuf()); LOG_TRACE(log, log_message.rdbuf());
}
MergeTreeData::DataPartsVector parts; MergeTreeData::DataPartsVector parts;
bool have_all_parts = true; bool have_all_parts = true;
...@@ -1036,13 +1058,16 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1036,13 +1058,16 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
if (!replica.empty()) if (!replica.empty())
{ {
do_fetch = true; do_fetch = true;
LOG_DEBUG(log, "Preffering to fetch " << entry.new_part_name << " from replica"); LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
} }
} }
} }
if (!do_fetch) if (do_fetch)
{ return;
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts); size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts);
/// Can throw an exception. /// Can throw an exception.
...@@ -1054,17 +1079,64 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1054,17 +1079,64 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
MergeTreeData::Transaction transaction; MergeTreeData::Transaction transaction;
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
MergeTreeDataMerger::FuturePart future_merged_part(parts);
if (future_merged_part.name != entry.new_part_name)
{
throw Exception("Future merged part name `" + future_merged_part.name + "` differs from part name in log entry: `"
+ entry.new_part_name + "`", ErrorCodes::BAD_DATA_PART_NAME);
}
/// Logging /// Logging
Stopwatch stopwatch; Stopwatch stopwatch;
ExecutionStatus execution_status;
MergeTreeData::MutableDataPartPtr part;
MergeTreeDataMerger::FuturePart future_merged_part(parts); auto write_part_log = [&] (const ExecutionStatus & execution_status)
if (future_merged_part.name != entry.new_part_name) {
throw Exception( try
"Future merged part name `" + future_merged_part.name + {
"` differs from part name in log entry: `" + entry.new_part_name + "`", auto part_log = context.getPartLog(database_name, table_name);
ErrorCodes::BAD_DATA_PART_NAME); if (!part_log)
return;
PartLogElement part_log_elem;
part_log_elem.event_type = PartLogElement::MERGE_PARTS;
part_log_elem.event_time = time(nullptr);
/// TODO: Stop stopwatch in outer code to exclude ZK timings and so on
part_log_elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
part_log_elem.part_name = entry.new_part_name;
if (part)
part_log_elem.bytes_compressed_on_disk = part->size_in_bytes;
part_log_elem.source_part_names.reserve(parts.size());
for (const auto & source_part : parts)
part_log_elem.source_part_names.push_back(source_part->name);
part_log_elem.rows_read = (*merge_entry)->bytes_read_uncompressed;
part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;
part_log_elem.rows = (*merge_entry)->rows_written;
part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
part_log_elem.error = static_cast<UInt16>(execution_status.code);
part_log_elem.exception = execution_status.message;
part_log->add(part_log_elem);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
auto part = merger.mergePartsToTemporaryPart( try
{
part = merger.mergePartsToTemporaryPart(
future_merged_part, *merge_entry, aio_threshold, entry.create_time, reserved_space.get(), entry.deduplicate); future_merged_part, *merge_entry, aio_threshold, entry.create_time, reserved_space.get(), entry.deduplicate);
zkutil::Ops ops; zkutil::Ops ops;
...@@ -1098,13 +1170,14 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1098,13 +1170,14 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
"7. Manual modification of source data after server startup. " "7. Manual modification of source data after server startup. "
"8. Manual modification of checksums stored in ZooKeeper. " "8. Manual modification of checksums stored in ZooKeeper. "
"We will download merged part from replica to force byte-identical result."); "We will download merged part from replica to force byte-identical result.");
write_part_log(ExecutionStatus::fromCurrentException());
return;
} }
else
throw; throw;
} }
if (!do_fetch)
{
merger.renameMergedTemporaryPart(part, parts, &transaction); merger.renameMergedTemporaryPart(part, parts, &transaction);
/// Do not commit if the part is obsolete /// Do not commit if the part is obsolete
...@@ -1121,49 +1194,20 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1121,49 +1194,20 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
* This is not a problem, because in this case the merge will remain in the queue, and we will try again. * This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/ */
merge_selecting_event.set(); merge_selecting_event.set();
if (auto part_log = context.getPartLog(database_name, table_name))
{
PartLogElement elem;
elem.event_time = time(nullptr);
elem.merged_from.reserve(parts.size());
for (const auto & part : parts)
elem.merged_from.push_back(part->name);
elem.event_type = PartLogElement::MERGE_PARTS;
elem.size_in_bytes = part->size_in_bytes;
elem.database_name = part->storage.getDatabaseName();
elem.table_name = part->storage.getTableName();
elem.part_name = part->name;
elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log->add(elem);
elem.duration_ms = 0;
elem.event_type = PartLogElement::REMOVE_PART;
elem.merged_from = Strings();
for (const auto & part : parts)
{
elem.part_name = part->name;
elem.size_in_bytes = part->size_in_bytes;
part_log->add(elem);
}
}
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
} }
} catch (...)
}
else
{ {
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type))); write_part_log(ExecutionStatus::fromCurrentException());
throw;
} }
}
if (do_fetch)
{ bool StorageReplicatedMergeTree::executeFetch(const StorageReplicatedMergeTree::LogEntry & entry)
{
String replica = findReplicaHavingCoveringPart(entry, true); String replica = findReplicaHavingCoveringPart(entry, true);
static std::atomic_uint total_fetches {0}; static std::atomic_uint total_fetches {0};
...@@ -1354,7 +1398,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1354,7 +1398,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
throw; throw;
} }
}
return true; return true;
} }
...@@ -2148,12 +2191,56 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ...@@ -2148,12 +2191,56 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
table_lock = lockStructure(true, __PRETTY_FUNCTION__); table_lock = lockStructure(true, __PRETTY_FUNCTION__);
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
/// Logging
Stopwatch stopwatch; Stopwatch stopwatch;
MergeTreeData::MutableDataPartPtr part;
MergeTreeData::DataPartsVector replaced_parts;
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); auto write_part_log = [&] (const ExecutionStatus & execution_status)
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( {
part_name, replica_path, address.host, address.replication_port, timeouts, to_detached); try
{
auto part_log = context.getPartLog(database_name, table_name);
if (!part_log)
return;
PartLogElement part_log_elem;
part_log_elem.event_time = time(nullptr);
part_log_elem.event_type = PartLogElement::DOWNLOAD_PART;
/// TODO: Stop stopwatch in outer code to exclude ZK timings and so on
part_log_elem.duration_ms = stopwatch.elapsed() / 10000000;
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
part_log_elem.part_name = part_name;
if (part)
{
part_log_elem.bytes_compressed_on_disk = part->size_in_bytes;
part_log_elem.rows = part->rows_count; /// Could be approximate (?)
}
part_log_elem.source_part_names.reserve(replaced_parts.size());
for (const auto & replaced_part : replaced_parts)
part_log_elem.source_part_names.push_back(replaced_part->name);
part_log_elem.error = static_cast<UInt16>(execution_status.code);
part_log_elem.exception = execution_status.message;
part_log->add(part_log_elem);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
try
{
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
if (!to_detached) if (!to_detached)
{ {
...@@ -2167,7 +2254,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ...@@ -2167,7 +2254,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
checkPartAndAddToZooKeeper(part, ops, part_name); checkPartAndAddToZooKeeper(part, ops, part_name);
MergeTreeData::Transaction transaction; MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); replaced_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
/// Do not commit if the part is obsolete /// Do not commit if the part is obsolete
if (!transaction.isEmpty()) if (!transaction.isEmpty())
...@@ -2176,37 +2263,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ...@@ -2176,37 +2263,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
transaction.commit(); transaction.commit();
} }
if (auto part_log = context.getPartLog(database_name, table_name))
{
PartLogElement elem;
elem.event_time = time(nullptr);
elem.event_type = PartLogElement::DOWNLOAD_PART;
elem.size_in_bytes = part->size_in_bytes;
elem.duration_ms = stopwatch.elapsed() / 10000000;
elem.merged_from.reserve(removed_parts.size());
for (const auto & part : removed_parts)
{
elem.merged_from.push_back(part->name);
}
elem.database_name = part->storage.getDatabaseName();
elem.table_name = part->storage.getTableName();
elem.part_name = part->name;
part_log->add(elem);
elem.duration_ms = 0;
elem.event_type = PartLogElement::REMOVE_PART;
elem.merged_from = Strings();
for (const auto & part : removed_parts)
{
elem.part_name = part->name;
elem.size_in_bytes = part->size_in_bytes;
part_log->add(elem);
}
}
/** If a quorum is tracked for this part, you must update it. /** If a quorum is tracked for this part, you must update it.
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
*/ */
...@@ -2215,16 +2271,26 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ...@@ -2215,16 +2271,26 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
merge_selecting_event.set(); merge_selecting_event.set();
for (const auto & removed_part : removed_parts) for (const auto & replaced_part : replaced_parts)
{ {
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name); LOG_DEBUG(log, "Part " << replaced_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts); ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
} }
write_part_log({});
} }
else else
{ {
part->renameTo("detached/" + part_name); part->renameTo("detached/" + part_name);
} }
}
catch (...)
{
if (!to_detached)
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches); ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
......
...@@ -342,6 +342,11 @@ private: ...@@ -342,6 +342,11 @@ private:
void executeDropRange(const LogEntry & entry); void executeDropRange(const LogEntry & entry);
/// Do the merge or recommend to make the fetch instead of the merge
void tryExecuteMerge(const LogEntry & entry, bool & do_fetch);
bool executeFetch(const LogEntry & entry);
void executeClearColumnInPartition(const LogEntry & entry); void executeClearColumnInPartition(const LogEntry & entry);
/** Updates the queue. /** Updates the queue.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册