Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
b919876a
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
b919876a
编写于
3月 22, 2019
作者:
A
alexey-milovidov
提交者:
GitHub
3月 22, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4758 from yandex/tsan-system-log-fix-race-on-shutdown
Fixed TSan report on shutdown
上级
6e2d2344
41fede9c
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
69 addition
and
31 deletion
+69
-31
dbms/src/Interpreters/Context.cpp
dbms/src/Interpreters/Context.cpp
+22
-16
dbms/src/Interpreters/Context.h
dbms/src/Interpreters/Context.h
+3
-3
dbms/src/Interpreters/PartLog.cpp
dbms/src/Interpreters/PartLog.cpp
+1
-1
dbms/src/Interpreters/SystemLog.cpp
dbms/src/Interpreters/SystemLog.cpp
+11
-3
dbms/src/Interpreters/SystemLog.h
dbms/src/Interpreters/SystemLog.h
+32
-8
未找到文件。
dbms/src/Interpreters/Context.cpp
浏览文件 @
b919876a
...
...
@@ -244,10 +244,18 @@ struct ContextShared
return
;
shutdown_called
=
true
;
system_logs
.
reset
();
{
std
::
lock_guard
lock
(
mutex
);
/** After this point, system logs will shutdown their threads and no longer write any data.
* It will prevent recreation of system tables at shutdown.
* Note that part changes at shutdown won't be logged to part log.
*/
system_logs
.
reset
();
}
/** At this point, some tables may have threads that block our mutex.
* To
complete
them correctly, we will copy the current list of tables,
* To
shutdown
them correctly, we will copy the current list of tables,
* and ask them all to finish their work.
* Then delete all objects with tables.
*/
...
...
@@ -259,6 +267,8 @@ struct ContextShared
current_databases
=
databases
;
}
/// We still hold "databases" in Context (instead of std::move) for Buffer tables to flush data correctly.
for
(
auto
&
database
:
current_databases
)
database
.
second
->
shutdown
();
...
...
@@ -1548,51 +1558,47 @@ Compiler & Context::getCompiler()
void
Context
::
initializeSystemLogs
()
{
auto
lock
=
getLock
();
if
(
!
global_context
)
throw
Exception
(
"Logical error: no global context for system logs"
,
ErrorCodes
::
LOGICAL_ERROR
);
shared
->
system_logs
.
emplace
(
*
global_context
,
getConfigRef
());
}
QueryLog
*
Context
::
getQueryLog
()
std
::
shared_ptr
<
QueryLog
>
Context
::
getQueryLog
()
{
auto
lock
=
getLock
();
if
(
!
shared
->
system_logs
||
!
shared
->
system_logs
->
query_log
)
return
nullptr
;
return
{}
;
return
shared
->
system_logs
->
query_log
.
get
()
;
return
shared
->
system_logs
->
query_log
;
}
QueryThreadLog
*
Context
::
getQueryThreadLog
()
std
::
shared_ptr
<
QueryThreadLog
>
Context
::
getQueryThreadLog
()
{
auto
lock
=
getLock
();
if
(
!
shared
->
system_logs
||
!
shared
->
system_logs
->
query_thread_log
)
return
nullptr
;
return
{}
;
return
shared
->
system_logs
->
query_thread_log
.
get
()
;
return
shared
->
system_logs
->
query_thread_log
;
}
PartLog
*
Context
::
getPartLog
(
const
String
&
part_database
)
std
::
shared_ptr
<
PartLog
>
Context
::
getPartLog
(
const
String
&
part_database
)
{
auto
lock
=
getLock
();
/// No part log or system logs are shutting down.
if
(
!
shared
->
system_logs
||
!
shared
->
system_logs
->
part_log
)
return
nullptr
;
return
{}
;
/// Will not log operations on system tables (including part_log itself).
/// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
/// and also make troubles on startup.
if
(
part_database
==
shared
->
system_logs
->
part_log_database
)
return
nullptr
;
return
{}
;
return
shared
->
system_logs
->
part_log
.
get
()
;
return
shared
->
system_logs
->
part_log
;
}
...
...
dbms/src/Interpreters/Context.h
浏览文件 @
b919876a
...
...
@@ -402,12 +402,12 @@ public:
void
initializeSystemLogs
();
/// Nullptr if the query log is not ready for this moment.
QueryLog
*
getQueryLog
();
QueryThreadLog
*
getQueryThreadLog
();
std
::
shared_ptr
<
QueryLog
>
getQueryLog
();
std
::
shared_ptr
<
QueryThreadLog
>
getQueryThreadLog
();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
PartLog
*
getPartLog
(
const
String
&
part_database
);
std
::
shared_ptr
<
PartLog
>
getPartLog
(
const
String
&
part_database
);
const
MergeTreeSettings
&
getMergeTreeSettings
()
const
;
...
...
dbms/src/Interpreters/PartLog.cpp
浏览文件 @
b919876a
...
...
@@ -104,7 +104,7 @@ bool PartLog::addNewParts(Context & current_context, const PartLog::MutableDataP
if
(
parts
.
empty
())
return
true
;
PartLog
*
part_log
=
nullptr
;
std
::
shared_ptr
<
PartLog
>
part_log
;
try
{
...
...
dbms/src/Interpreters/SystemLog.cpp
浏览文件 @
b919876a
...
...
@@ -16,7 +16,7 @@ constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500;
/// Creates a system log with MergeTree engine using parameters from config
template
<
typename
TSystemLog
>
std
::
unique
_ptr
<
TSystemLog
>
createSystemLog
(
std
::
shared
_ptr
<
TSystemLog
>
createSystemLog
(
Context
&
context
,
const
String
&
default_database_name
,
const
String
&
default_table_name
,
...
...
@@ -33,7 +33,7 @@ std::unique_ptr<TSystemLog> createSystemLog(
size_t
flush_interval_milliseconds
=
config
.
getUInt64
(
config_prefix
+
".flush_interval_milliseconds"
,
DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS
);
return
std
::
make_
unique
<
TSystemLog
>
(
context
,
database
,
table
,
engine
,
flush_interval_milliseconds
);
return
std
::
make_
shared
<
TSystemLog
>
(
context
,
database
,
table
,
engine
,
flush_interval_milliseconds
);
}
}
...
...
@@ -49,6 +49,14 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
}
SystemLogs
::~
SystemLogs
()
=
default
;
SystemLogs
::~
SystemLogs
()
{
if
(
query_log
)
query_log
->
shutdown
();
if
(
query_thread_log
)
query_thread_log
->
shutdown
();
if
(
part_log
)
part_log
->
shutdown
();
}
}
dbms/src/Interpreters/SystemLog.h
浏览文件 @
b919876a
#pragma once
#include <thread>
#include <atomic>
#include <boost/noncopyable.hpp>
#include <common/logger_useful.h>
#include <Core/Types.h>
...
...
@@ -66,9 +67,9 @@ struct SystemLogs
SystemLogs
(
Context
&
global_context
,
const
Poco
::
Util
::
AbstractConfiguration
&
config
);
~
SystemLogs
();
std
::
unique
_ptr
<
QueryLog
>
query_log
;
/// Used to log queries.
std
::
unique
_ptr
<
QueryThreadLog
>
query_thread_log
;
/// Used to log query threads.
std
::
unique
_ptr
<
PartLog
>
part_log
;
/// Used to log operations with parts
std
::
shared
_ptr
<
QueryLog
>
query_log
;
/// Used to log queries.
std
::
shared
_ptr
<
QueryThreadLog
>
query_thread_log
;
/// Used to log query threads.
std
::
shared
_ptr
<
PartLog
>
part_log
;
/// Used to log operations with parts
String
part_log_database
;
};
...
...
@@ -78,7 +79,6 @@ template <typename LogElement>
class
SystemLog
:
private
boost
::
noncopyable
{
public:
using
Self
=
SystemLog
;
/** Parameter: table name where to write log.
...
...
@@ -103,13 +103,23 @@ public:
*/
void
add
(
const
LogElement
&
element
)
{
if
(
is_shutdown
)
return
;
/// Without try we could block here in case of queue overflow.
if
(
!
queue
.
tryPush
({
false
,
element
}))
LOG_ERROR
(
log
,
"SystemLog queue is full"
);
}
/// Flush data in the buffer to disk
void
flush
(
bool
quiet
=
false
);
void
flush
()
{
if
(
!
is_shutdown
)
flushImpl
(
false
);
}
/// Stop the background flush thread before destructor. No more data will be written.
void
shutdown
();
protected:
Context
&
context
;
...
...
@@ -118,6 +128,7 @@ protected:
const
String
storage_def
;
StoragePtr
table
;
const
size_t
flush_interval_milliseconds
;
std
::
atomic
<
bool
>
is_shutdown
{
false
};
using
QueueItem
=
std
::
pair
<
bool
,
LogElement
>
;
/// First element is shutdown flag for thread.
...
...
@@ -145,6 +156,8 @@ protected:
*/
bool
is_prepared
=
false
;
void
prepareTable
();
void
flushImpl
(
bool
quiet
);
};
...
...
@@ -166,14 +179,25 @@ SystemLog<LogElement>::SystemLog(Context & context_,
template
<
typename
LogElement
>
SystemLog
<
LogElement
>::~
SystemLog
()
void
SystemLog
<
LogElement
>::
shutdown
()
{
bool
old_val
=
false
;
if
(
!
is_shutdown
.
compare_exchange_strong
(
old_val
,
true
))
return
;
/// Tell thread to shutdown.
queue
.
push
({
true
,
{}});
saving_thread
.
join
();
}
template
<
typename
LogElement
>
SystemLog
<
LogElement
>::~
SystemLog
()
{
shutdown
();
}
template
<
typename
LogElement
>
void
SystemLog
<
LogElement
>::
threadFunction
()
{
...
...
@@ -236,7 +260,7 @@ void SystemLog<LogElement>::threadFunction()
if
(
milliseconds_elapsed
>=
flush_interval_milliseconds
)
{
/// Write data to a table.
flush
(
true
);
flush
Impl
(
true
);
time_after_last_write
.
restart
();
}
}
...
...
@@ -251,7 +275,7 @@ void SystemLog<LogElement>::threadFunction()
template
<
typename
LogElement
>
void
SystemLog
<
LogElement
>::
flush
(
bool
quiet
)
void
SystemLog
<
LogElement
>::
flush
Impl
(
bool
quiet
)
{
std
::
unique_lock
lock
(
data_mutex
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录