提交 41fede9c 编写于 作者: A Alexey Milovidov

Fixed TSan report on shutdown

上级 0bbd5419
......@@ -244,10 +244,18 @@ struct ContextShared
return;
shutdown_called = true;
system_logs.reset();
{
std::lock_guard lock(mutex);
/** After this point, system logs will shutdown their threads and no longer write any data.
* It will prevent recreation of system tables at shutdown.
* Note that part changes at shutdown won't be logged to part log.
*/
system_logs.reset();
}
/** At this point, some tables may have threads that block our mutex.
* To complete them correctly, we will copy the current list of tables,
* To shutdown them correctly, we will copy the current list of tables,
* and ask them all to finish their work.
* Then delete all objects with tables.
*/
......@@ -259,6 +267,8 @@ struct ContextShared
current_databases = databases;
}
/// We still hold "databases" in Context (instead of std::move) for Buffer tables to flush data correctly.
for (auto & database : current_databases)
database.second->shutdown();
......@@ -1548,51 +1558,47 @@ Compiler & Context::getCompiler()
void Context::initializeSystemLogs()
{
auto lock = getLock();
if (!global_context)
throw Exception("Logical error: no global context for system logs", ErrorCodes::LOGICAL_ERROR);
shared->system_logs.emplace(*global_context, getConfigRef());
}
QueryLog * Context::getQueryLog()
std::shared_ptr<QueryLog> Context::getQueryLog()
{
auto lock = getLock();
if (!shared->system_logs || !shared->system_logs->query_log)
return nullptr;
return {};
return shared->system_logs->query_log.get();
return shared->system_logs->query_log;
}
QueryThreadLog * Context::getQueryThreadLog()
std::shared_ptr<QueryThreadLog> Context::getQueryThreadLog()
{
auto lock = getLock();
if (!shared->system_logs || !shared->system_logs->query_thread_log)
return nullptr;
return {};
return shared->system_logs->query_thread_log.get();
return shared->system_logs->query_thread_log;
}
PartLog * Context::getPartLog(const String & part_database)
std::shared_ptr<PartLog> Context::getPartLog(const String & part_database)
{
auto lock = getLock();
/// No part log or system logs are shutting down.
if (!shared->system_logs || !shared->system_logs->part_log)
return nullptr;
return {};
/// Will not log operations on system tables (including part_log itself).
/// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
/// and also make troubles on startup.
if (part_database == shared->system_logs->part_log_database)
return nullptr;
return {};
return shared->system_logs->part_log.get();
return shared->system_logs->part_log;
}
......
......@@ -402,12 +402,12 @@ public:
void initializeSystemLogs();
/// Nullptr if the query log is not ready for this moment.
QueryLog * getQueryLog();
QueryThreadLog * getQueryThreadLog();
std::shared_ptr<QueryLog> getQueryLog();
std::shared_ptr<QueryThreadLog> getQueryThreadLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
PartLog * getPartLog(const String & part_database);
std::shared_ptr<PartLog> getPartLog(const String & part_database);
const MergeTreeSettings & getMergeTreeSettings() const;
......
......@@ -104,7 +104,7 @@ bool PartLog::addNewParts(Context & current_context, const PartLog::MutableDataP
if (parts.empty())
return true;
PartLog * part_log = nullptr;
std::shared_ptr<PartLog> part_log;
try
{
......
......@@ -16,7 +16,7 @@ constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500;
/// Creates a system log with MergeTree engine using parameters from config
template <typename TSystemLog>
std::unique_ptr<TSystemLog> createSystemLog(
std::shared_ptr<TSystemLog> createSystemLog(
Context & context,
const String & default_database_name,
const String & default_table_name,
......@@ -33,7 +33,7 @@ std::unique_ptr<TSystemLog> createSystemLog(
size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds", DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS);
return std::make_unique<TSystemLog>(context, database, table, engine, flush_interval_milliseconds);
return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds);
}
}
......@@ -49,6 +49,14 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
}
SystemLogs::~SystemLogs() = default;
SystemLogs::~SystemLogs()
{
if (query_log)
query_log->shutdown();
if (query_thread_log)
query_thread_log->shutdown();
if (part_log)
part_log->shutdown();
}
}
#pragma once
#include <thread>
#include <atomic>
#include <boost/noncopyable.hpp>
#include <common/logger_useful.h>
#include <Core/Types.h>
......@@ -66,9 +67,9 @@ struct SystemLogs
SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config);
~SystemLogs();
std::unique_ptr<QueryLog> query_log; /// Used to log queries.
std::unique_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::unique_ptr<PartLog> part_log; /// Used to log operations with parts
std::shared_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
String part_log_database;
};
......@@ -78,7 +79,6 @@ template <typename LogElement>
class SystemLog : private boost::noncopyable
{
public:
using Self = SystemLog;
/** Parameter: table name where to write log.
......@@ -103,13 +103,23 @@ public:
*/
void add(const LogElement & element)
{
if (is_shutdown)
return;
/// Without try we could block here in case of queue overflow.
if (!queue.tryPush({false, element}))
LOG_ERROR(log, "SystemLog queue is full");
}
/// Flush data in the buffer to disk
void flush(bool quiet = false);
void flush()
{
if (!is_shutdown)
flushImpl(false);
}
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown();
protected:
Context & context;
......@@ -118,6 +128,7 @@ protected:
const String storage_def;
StoragePtr table;
const size_t flush_interval_milliseconds;
std::atomic<bool> is_shutdown{false};
using QueueItem = std::pair<bool, LogElement>; /// First element is shutdown flag for thread.
......@@ -145,6 +156,8 @@ protected:
*/
bool is_prepared = false;
void prepareTable();
void flushImpl(bool quiet);
};
......@@ -166,14 +179,25 @@ SystemLog<LogElement>::SystemLog(Context & context_,
template <typename LogElement>
SystemLog<LogElement>::~SystemLog()
void SystemLog<LogElement>::shutdown()
{
bool old_val = false;
if (!is_shutdown.compare_exchange_strong(old_val, true))
return;
/// Tell thread to shutdown.
queue.push({true, {}});
saving_thread.join();
}
template <typename LogElement>
SystemLog<LogElement>::~SystemLog()
{
shutdown();
}
template <typename LogElement>
void SystemLog<LogElement>::threadFunction()
{
......@@ -236,7 +260,7 @@ void SystemLog<LogElement>::threadFunction()
if (milliseconds_elapsed >= flush_interval_milliseconds)
{
/// Write data to a table.
flush(true);
flushImpl(true);
time_after_last_write.restart();
}
}
......@@ -251,7 +275,7 @@ void SystemLog<LogElement>::threadFunction()
template <typename LogElement>
void SystemLog<LogElement>::flush(bool quiet)
void SystemLog<LogElement>::flushImpl(bool quiet)
{
std::unique_lock lock(data_mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册