Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
dce56bfe
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
dce56bfe
编写于
10月 14, 2016
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Implemented TODO [#METR-23132].
上级
4bd127a8
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
47 addition
and
59 deletion
+47
-59
dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h
.../include/DB/Storages/MergeTree/BackgroundProcessingPool.h
+9
-9
dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
+38
-50
未找到文件。
dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h
浏览文件 @
dce56bfe
...
...
@@ -8,6 +8,7 @@
#include <mutex>
#include <Poco/RWLock.h>
#include <Poco/Event.h>
#include <Poco/Timestamp.h>
#include <DB/Core/Types.h>
namespace
DB
...
...
@@ -51,30 +52,29 @@ public:
Counters
&
local_counters
;
};
///
Возвращает true, если что-то получилось сделать. В таком случае поток не будет спать перед следующим вызовом
.
///
Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task
.
using
Task
=
std
::
function
<
bool
(
Context
&
context
)
>
;
class
TaskInfo
{
public:
///
Разбудить какой-нибудь поток
.
///
Wake up any thread
.
void
wake
();
TaskInfo
(
BackgroundProcessingPool
&
pool_
,
const
Task
&
function_
)
:
pool
(
pool_
),
function
(
function_
)
{}
private:
friend
class
BackgroundProcessingPool
;
BackgroundProcessingPool
&
pool
;
Task
function
;
///
При выполнении задачи, держится read lock
.
///
Read lock is hold when task is executed
.
Poco
::
RWLock
rwlock
;
std
::
atomic
<
bool
>
removed
{
false
};
std
::
atomic
<
time_t
>
next_time_to_execute
{
0
};
/// Приоритет задачи. Для совпадающего времени в секундах берётся первая по списку задача.
std
::
list
<
std
::
shared_ptr
<
TaskInfo
>>::
iterator
iterator
;
TaskInfo
(
BackgroundProcessingPool
&
pool_
,
const
Task
&
function_
)
:
pool
(
pool_
),
function
(
function_
)
{}
std
::
multimap
<
Poco
::
Timestamp
,
std
::
shared_ptr
<
TaskInfo
>>::
iterator
iterator
;
};
using
TaskHandle
=
std
::
shared_ptr
<
TaskInfo
>
;
...
...
@@ -95,14 +95,14 @@ public:
~
BackgroundProcessingPool
();
private:
using
Tasks
=
std
::
list
<
TaskHandle
>
;
using
Tasks
=
std
::
multimap
<
Poco
::
Timestamp
,
TaskHandle
>
;
/// key is desired next time to execute (priority).
using
Threads
=
std
::
vector
<
std
::
thread
>
;
const
size_t
size
;
static
constexpr
double
sleep_seconds
=
10
;
static
constexpr
double
sleep_seconds_random_part
=
1.0
;
Tasks
tasks
;
///
Задачи в порядке, в котором мы планируем их выполнять
.
Tasks
tasks
;
///
Ordered in priority
.
std
::
mutex
tasks_mutex
;
Counters
counters
;
...
...
dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
浏览文件 @
dce56bfe
...
...
@@ -20,18 +20,23 @@ void BackgroundProcessingPool::TaskInfo::wake()
if
(
removed
)
return
;
time_t
current_time
=
time
(
0
)
;
Poco
::
Timestamp
current_time
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pool
.
tasks_mutex
);
pool
.
tasks
.
splice
(
pool
.
tasks
.
begin
(),
pool
.
tasks
,
iterator
);
/// Если эта задача в прошлый раз ничего не сделала, и ей было назначено спать, то отменим время сна.
auto
next_time_to_execute
=
iterator
->
first
;
TaskHandle
this_task_handle
=
iterator
->
second
;
/// If this task was done nothing at previous time and it has to sleep, then cancel sleep time.
if
(
next_time_to_execute
>
current_time
)
next_time_to_execute
=
current_time
;
pool
.
tasks
.
erase
(
iterator
);
iterator
=
pool
.
tasks
.
emplace
(
next_time_to_execute
,
this_task_handle
);
}
///
Если все потоки сейчас выполняют работу, этот вызов никого не разбудит
.
///
Note that if all threads are currently do some work, this call will not wakeup any thread
.
pool
.
wake_event
.
notify_one
();
}
...
...
@@ -54,11 +59,13 @@ int BackgroundProcessingPool::getCounter(const String & name)
BackgroundProcessingPool
::
TaskHandle
BackgroundProcessingPool
::
addTask
(
const
Task
&
task
)
{
TaskHandle
res
(
new
TaskInfo
(
*
this
,
task
));
TaskHandle
res
=
std
::
make_shared
<
TaskInfo
>
(
*
this
,
task
);
Poco
::
Timestamp
current_time
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
tasks_mutex
);
res
->
iterator
=
tasks
.
insert
(
tasks
.
begin
()
,
res
);
res
->
iterator
=
tasks
.
emplace
(
current_time
,
res
);
}
wake_event
.
notify_all
();
...
...
@@ -71,7 +78,7 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task)
if
(
task
->
removed
.
exchange
(
true
))
return
;
///
Дождёмся завершения всех выполнений этой задачи
.
///
Wait for all execution of this task
.
{
Poco
::
ScopedWriteRWLock
wlock
(
task
->
rwlock
);
}
...
...
@@ -108,49 +115,27 @@ void BackgroundProcessingPool::threadFunction()
while
(
!
shutdown
)
{
Counters
counters_diff
;
bool
has_exception
=
false
;
bool
done_work
=
false
;
TaskHandle
task
;
try
{
TaskHandle
task
;
time_t
min_time
=
std
::
numeric_limits
<
time_t
>::
max
();
Poco
::
Timestamp
min_time
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
tasks_mutex
);
if
(
!
tasks
.
empty
())
{
/** Number of tasks is about number of tables of MergeTree family.
* Select task with minimal 'next_time_to_execute', and place to end of queue.
* Remind that one task could be selected and executed simultaneously from many threads.
*
* Tasks is like priority queue,
* but we must have ability to change priority of any task in queue.
*
* If there is too much tasks, select from first 100.
* TODO Change list to multimap.
*/
size_t
i
=
0
;
for
(
const
auto
&
handle
:
tasks
)
for
(
const
auto
&
time_handle
:
tasks
)
{
if
(
handle
->
removed
)
continue
;
time_t
next_time_to_execute
=
handle
->
next_time_to_execute
;
if
(
next_time_to_execute
<
min_time
)
if
(
!
time_handle
.
second
->
removed
)
{
min_time
=
next_time_to_execute
;
task
=
handle
;
}
++
i
;
if
(
i
>
100
)
min_time
=
time_handle
.
first
;
task
=
time_handle
.
second
;
break
;
}
}
if
(
task
)
/// Переложим в конец очереди (уменьшим приоритет среди задач с одинаковым next_time_to_execute).
tasks
.
splice
(
tasks
.
end
(),
tasks
,
task
->
iterator
);
}
}
...
...
@@ -166,13 +151,13 @@ void BackgroundProcessingPool::threadFunction()
continue
;
}
///
Лучшей задачи не нашлось, а эта задача в прошлый раз ничего не сделала, и поэтому ей назначено некоторое время спать
.
time_t
current_time
=
time
(
0
)
;
///
No tasks ready for execution
.
Poco
::
Timestamp
current_time
;
if
(
min_time
>
current_time
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
tasks_mutex
);
wake_event
.
wait_for
(
lock
,
std
::
chrono
::
duration
<
double
>
(
min_time
-
current_time
+
std
::
uniform_
real_distribution
<
double
>
(
0
,
sleep_seconds_random_part
)(
rng
)));
wake_event
.
wait_for
(
lock
,
std
::
chrono
::
microseconds
(
min_time
-
current_time
+
std
::
uniform_
int_distribution
<
uint64_t
>
(
0
,
sleep_seconds_random_part
*
1000000
)(
rng
)));
}
Poco
::
ScopedReadRWLock
rlock
(
task
->
rwlock
);
...
...
@@ -184,20 +169,15 @@ void BackgroundProcessingPool::threadFunction()
CurrentMetrics
::
Increment
metric_increment
{
CurrentMetrics
::
BackgroundPoolTask
};
Context
context
(
*
this
,
counters_diff
);
bool
done_work
=
task
->
function
(
context
);
/// Если задача сделала полезную работу, то она сможет выполняться в следующий раз хоть сразу.
/// Если нет - добавляем задержку перед повторным исполнением.
task
->
next_time_to_execute
=
time
(
0
)
+
(
done_work
?
0
:
sleep_seconds
);
done_work
=
task
->
function
(
context
);
}
}
catch
(...)
{
has_exception
=
true
;
tryLogCurrentException
(
__PRETTY_FUNCTION__
);
}
///
Вычтем все счётчики обратно
.
///
Subtract counters backwards
.
if
(
!
counters_diff
.
empty
())
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
counters_mutex
);
...
...
@@ -208,10 +188,18 @@ void BackgroundProcessingPool::threadFunction()
if
(
shutdown
)
break
;
if
(
has_exception
)
/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco
::
Timestamp
next_time_to_execute
=
Poco
::
Timestamp
()
+
(
done_work
?
0
:
sleep_seconds
*
1000000
);
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
tasks_mutex
);
wake_event
.
wait_for
(
lock
,
std
::
chrono
::
duration
<
double
>
(
sleep_seconds
));
if
(
task
->
removed
)
return
;
tasks
.
erase
(
task
->
iterator
);
task
->
iterator
=
tasks
.
emplace
(
next_time_to_execute
,
task
);
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录