Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
6f3d8f0e
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
6f3d8f0e
编写于
6月 21, 2015
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dbms: added simple priorities system [#METR-16911].
上级
001ffdfa
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
266 addition
and
96 deletion
+266
-96
dbms/include/DB/DataStreams/BlockIO.h
dbms/include/DB/DataStreams/BlockIO.h
+5
-2
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
+3
-3
dbms/include/DB/Interpreters/ProcessList.h
dbms/include/DB/Interpreters/ProcessList.h
+46
-87
dbms/include/DB/Interpreters/QueryPriorities.h
dbms/include/DB/Interpreters/QueryPriorities.h
+117
-0
dbms/include/DB/Interpreters/Settings.h
dbms/include/DB/Interpreters/Settings.h
+3
-0
dbms/src/DataStreams/BlockIO.cpp
dbms/src/DataStreams/BlockIO.cpp
+9
-0
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
+2
-1
dbms/src/Interpreters/ProcessList.cpp
dbms/src/Interpreters/ProcessList.cpp
+72
-0
dbms/src/Interpreters/executeQuery.cpp
dbms/src/Interpreters/executeQuery.cpp
+7
-3
dbms/src/Server/Server.cpp
dbms/src/Server/Server.cpp
+1
-0
dbms/src/Storages/StorageSystemProcesses.cpp
dbms/src/Storages/StorageSystemProcesses.cpp
+1
-0
未找到文件。
dbms/include/DB/DataStreams/BlockIO.h
浏览文件 @
6f3d8f0e
...
...
@@ -2,12 +2,13 @@
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Interpreters/ProcessList.h>
namespace
DB
{
class
ProcessListEntry
;
struct
BlockIO
{
/** process_list_entry должен уничтожаться позже, чем in и out,
...
...
@@ -15,7 +16,7 @@ struct BlockIO
* (MemoryTracker * current_memory_tracker),
* которая может использоваться до уничтожения in и out.
*/
ProcessList
::
EntryPtr
process_list_entry
;
std
::
shared_ptr
<
ProcessListEntry
>
process_list_entry
;
BlockInputStreamPtr
in
;
BlockOutputStreamPtr
out
;
...
...
@@ -38,6 +39,8 @@ struct BlockIO
return
*
this
;
}
~
BlockIO
();
};
}
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
浏览文件 @
6f3d8f0e
...
...
@@ -3,7 +3,6 @@
#include <DB/Core/Progress.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
#include <DB/DataStreams/IBlockInputStream.h>
...
...
@@ -14,6 +13,7 @@ namespace DB
{
class
QuotaForIntervals
;
class
ProcessListElement
;
/** Смотрит за тем, как работает источник блоков.
...
...
@@ -82,7 +82,7 @@ public:
* На основе этой информации будет проверяться квота, и некоторые ограничения.
* Также эта информация будет доступна в запросе SHOW PROCESSLIST.
*/
void
setProcessListElement
(
ProcessList
::
Element
*
elem
);
void
setProcessListElement
(
ProcessListElement
*
elem
);
/** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать.
*/
...
...
@@ -154,7 +154,7 @@ protected:
BlockStreamProfileInfo
info
;
std
::
atomic
<
bool
>
is_cancelled
{
false
};
ProgressCallback
progress_callback
;
ProcessList
::
Element
*
process_list_elem
=
nullptr
;
ProcessListElement
*
process_list_elem
=
nullptr
;
bool
enabled_extremes
=
false
;
...
...
dbms/include/DB/Interpreters/ProcessList.h
浏览文件 @
6f3d8f0e
...
...
@@ -2,7 +2,7 @@
#include <map>
#include <list>
#include <
Poco/SharedPtr.h
>
#include <
memory
>
#include <Poco/Mutex.h>
#include <Poco/Condition.h>
#include <Poco/Net/IPAddress.h>
...
...
@@ -13,6 +13,7 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
namespace
DB
...
...
@@ -36,13 +37,16 @@ struct ProcessListElement
MemoryTracker
memory_tracker
;
QueryPriorities
::
Handle
priority_handle
;
bool
is_cancelled
=
false
;
ProcessListElement
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
)
:
query
(
query_
),
user
(
user_
),
query_id
(
query_id_
),
ip_address
(
ip_address_
),
memory_tracker
(
max_memory_usage
)
size_t
max_memory_usage
,
QueryPriorities
::
Handle
&&
priority_handle_
)
:
query
(
query_
),
user
(
user_
),
query_id
(
query_id_
),
ip_address
(
ip_address_
),
memory_tracker
(
max_memory_usage
),
priority_handle
(
std
::
move
(
priority_handle_
))
{
current_memory_tracker
=
&
memory_tracker
;
}
...
...
@@ -55,126 +59,81 @@ struct ProcessListElement
bool
update
(
const
Progress
&
value
)
{
progress
.
incrementPiecewiseAtomically
(
value
);
if
(
priority_handle
)
priority_handle
->
waitIfNeed
(
std
::
chrono
::
seconds
(
1
));
/// NOTE Можно сделать настраиваемым таймаут.
return
!
is_cancelled
;
}
};
class
ProcessList
;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class
ProcessListEntry
{
private:
using
Container
=
std
::
list
<
ProcessListElement
>
;
ProcessList
&
parent
;
Container
::
iterator
it
;
public:
ProcessListEntry
(
ProcessList
&
parent_
,
Container
::
iterator
it_
)
:
parent
(
parent_
),
it
(
it_
)
{}
~
ProcessListEntry
();
ProcessListElement
*
operator
->
()
{
return
&*
it
;
}
const
ProcessListElement
*
operator
->
()
const
{
return
&*
it
;
}
ProcessListElement
&
get
()
{
return
*
it
;
}
const
ProcessListElement
&
get
()
const
{
return
*
it
;
}
};
class
ProcessList
{
friend
class
Entry
;
friend
class
ProcessList
Entry
;
public:
using
Element
=
ProcessListElement
;
using
Entry
=
ProcessListEntry
;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
typedef
std
::
list
<
Element
>
Containter
;
using
Container
=
std
::
list
<
Element
>
;
/// Query_id -> Element *
typedef
std
::
unordered_map
<
String
,
Element
*>
QueryToElement
;
using
QueryToElement
=
std
::
unordered_map
<
String
,
Element
*>
;
/// User -> Query_id -> Element *
typedef
std
::
unordered_map
<
String
,
QueryToElement
>
UserToQueries
;
using
UserToQueries
=
std
::
unordered_map
<
String
,
QueryToElement
>
;
private:
mutable
Poco
::
FastMutex
mutex
;
mutable
Poco
::
Condition
have_space
;
/// Количество одновременно выполняющихся запросов стало меньше максимального.
Contain
t
er
cont
;
Container
cont
;
size_t
cur_size
;
/// В C++03 std::list::size не O(1).
size_t
max_size
;
/// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
UserToQueries
user_to_queries
;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class
Entry
{
private:
ProcessList
&
parent
;
Containter
::
iterator
it
;
public:
Entry
(
ProcessList
&
parent_
,
Containter
::
iterator
it_
)
:
parent
(
parent_
),
it
(
it_
)
{}
~
Entry
()
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
parent
.
mutex
);
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
if
(
!
it
->
is_cancelled
&&
!
it
->
query_id
.
empty
())
{
UserToQueries
::
iterator
queries
=
parent
.
user_to_queries
.
find
(
it
->
user
);
if
(
queries
!=
parent
.
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
it
->
query_id
);
if
(
element
!=
queries
->
second
.
end
())
queries
->
second
.
erase
(
element
);
}
}
parent
.
cont
.
erase
(
it
);
--
parent
.
cur_size
;
parent
.
have_space
.
signal
();
}
Element
*
operator
->
()
{
return
&*
it
;
}
const
Element
*
operator
->
()
const
{
return
&*
it
;
}
Element
&
get
()
{
return
*
it
;
}
const
Element
&
get
()
const
{
return
*
it
;
}
};
QueryPriorities
priorities
;
public:
ProcessList
(
size_t
max_size_
=
0
)
:
cur_size
(
0
),
max_size
(
max_size_
)
{}
typedef
Poco
::
SharedPtr
<
Entry
>
EntryPtr
;
typedef
std
::
shared_ptr
<
ProcessList
Entry
>
EntryPtr
;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
*/
EntryPtr
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
=
0
,
size_t
max_wait_milliseconds
=
DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS
,
bool
replace_running_query
=
false
)
{
EntryPtr
res
;
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
if
(
max_size
&&
cur_size
>=
max_size
&&
(
!
max_wait_milliseconds
||
!
have_space
.
tryWait
(
mutex
,
max_wait_milliseconds
)))
throw
Exception
(
"Too much simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MUCH_SIMULTANEOUS_QUERIES
);
if
(
!
query_id_
.
empty
())
{
UserToQueries
::
iterator
queries
=
user_to_queries
.
find
(
user_
);
if
(
queries
!=
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
query_id_
);
if
(
element
!=
queries
->
second
.
end
())
{
if
(
!
replace_running_query
)
throw
Exception
(
"Query with id = "
+
query_id_
+
" is already running."
,
ErrorCodes
::
QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING
);
element
->
second
->
is_cancelled
=
true
;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries
->
second
.
erase
(
element
);
}
}
}
++
cur_size
;
res
=
new
Entry
(
*
this
,
cont
.
emplace
(
cont
.
end
(),
query_
,
user_
,
query_id_
,
ip_address_
,
max_memory_usage
));
if
(
!
query_id_
.
empty
())
user_to_queries
[
user_
][
query_id_
]
=
&
res
->
get
();
}
return
res
;
}
size_t
max_memory_usage
,
size_t
max_wait_milliseconds
,
bool
replace_running_query
,
QueryPriorities
::
Priority
priority
);
/// Количество одновременно выполняющихся запросов.
size_t
size
()
const
{
return
cur_size
;
}
/// Получить текущее состояние (копию) списка запросов.
Contain
t
er
get
()
const
Container
get
()
const
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
return
cont
;
...
...
dbms/include/DB/Interpreters/QueryPriorities.h
0 → 100644
浏览文件 @
6f3d8f0e
#pragma once
#include <map>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <chrono>
/** Реализует приоритеты запросов.
* Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос.
*
* Величина приоритета - целое число, чем меньше - тем больше приоритет.
*
* Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда,
* не зависят от других запросов и не влияют на другие запросы.
* То есть 0 означает - не использовать приоритеты.
*
* NOTE Возможности сделать лучше:
* - реализовать ограничение на максимальное количество запросов с таким приоритетом.
*/
class
QueryPriorities
{
public:
using
Priority
=
int
;
private:
friend
struct
Handle
;
using
Count
=
int
;
/// Количество выполняющихся сейчас запросов с заданным приоритетом.
using
Container
=
std
::
map
<
Priority
,
Count
>
;
std
::
mutex
mutex
;
std
::
condition_variable
condvar
;
Container
container
;
/** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут.
* Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут.
*/
template
<
typename
Duration
>
bool
waitIfNeed
(
Priority
priority
,
Duration
timeout
)
{
if
(
0
==
priority
)
return
true
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
while
(
true
)
{
/// Если ли хотя бы один более приоритетный запрос?
bool
found
=
false
;
for
(
const
auto
&
value
:
container
)
{
if
(
value
.
first
>=
priority
)
break
;
if
(
value
.
second
>
0
)
{
found
=
true
;
break
;
}
}
if
(
!
found
)
return
true
;
if
(
std
::
cv_status
::
timeout
==
condvar
.
wait_for
(
lock
,
timeout
))
return
false
;
}
}
public:
struct
HandleImpl
{
private:
QueryPriorities
&
parent
;
QueryPriorities
::
Container
::
value_type
&
value
;
public:
HandleImpl
(
QueryPriorities
&
parent_
,
QueryPriorities
::
Container
::
value_type
&
value_
)
:
parent
(
parent_
),
value
(
value_
)
{}
~
HandleImpl
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parent
.
mutex
);
--
value
.
second
;
}
parent
.
condvar
.
notify_all
();
}
template
<
typename
Duration
>
bool
waitIfNeed
(
Duration
timeout
)
{
return
parent
.
waitIfNeed
(
value
.
first
,
timeout
);
}
};
using
Handle
=
std
::
shared_ptr
<
HandleImpl
>
;
/** Зарегистрировать, что запрос с заданным приоритетом выполняется.
* Возвращается объект, в деструкторе которого, запись о запросе удаляется.
*/
Handle
insert
(
Priority
priority
)
{
if
(
0
==
priority
)
return
{};
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
auto
it
=
container
.
emplace
(
priority
,
0
).
first
;
++
it
->
second
;
return
std
::
make_shared
<
HandleImpl
>
(
*
this
,
*
it
);
}
};
dbms/include/DB/Interpreters/Settings.h
浏览文件 @
6f3d8f0e
...
...
@@ -132,6 +132,9 @@ struct Settings
\
/** Позволяет выбирать метод сжатия данных при записи */
\
M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \
\
/** Приоритет запроса. 1 - самый высокий, больше - ниже; 0 - не использовать приоритеты. */
\
M(SettingUInt64, priority, 0) \
/// Всевозможные ограничения на выполнение запроса.
Limits
limits
;
...
...
dbms/src/DataStreams/BlockIO.cpp
0 → 100644
浏览文件 @
6f3d8f0e
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockIO.h>
namespace
DB
{
BlockIO
::~
BlockIO
()
=
default
;
}
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
浏览文件 @
6f3d8f0e
...
...
@@ -5,6 +5,7 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
...
...
@@ -320,7 +321,7 @@ void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback)
}
void
IProfilingBlockInputStream
::
setProcessListElement
(
ProcessList
::
Element
*
elem
)
void
IProfilingBlockInputStream
::
setProcessListElement
(
ProcessListElement
*
elem
)
{
process_list_elem
=
elem
;
...
...
dbms/src/Interpreters/ProcessList.cpp
0 → 100644
浏览文件 @
6f3d8f0e
#include <DB/Interpreters/ProcessList.h>
namespace
DB
{
ProcessList
::
EntryPtr
ProcessList
::
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
,
size_t
max_wait_milliseconds
,
bool
replace_running_query
,
QueryPriorities
::
Priority
priority
)
{
EntryPtr
res
;
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
if
(
max_size
&&
cur_size
>=
max_size
&&
(
!
max_wait_milliseconds
||
!
have_space
.
tryWait
(
mutex
,
max_wait_milliseconds
)))
throw
Exception
(
"Too much simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MUCH_SIMULTANEOUS_QUERIES
);
if
(
!
query_id_
.
empty
())
{
UserToQueries
::
iterator
queries
=
user_to_queries
.
find
(
user_
);
if
(
queries
!=
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
query_id_
);
if
(
element
!=
queries
->
second
.
end
())
{
if
(
!
replace_running_query
)
throw
Exception
(
"Query with id = "
+
query_id_
+
" is already running."
,
ErrorCodes
::
QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING
);
element
->
second
->
is_cancelled
=
true
;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries
->
second
.
erase
(
element
);
}
}
}
++
cur_size
;
res
.
reset
(
new
Entry
(
*
this
,
cont
.
emplace
(
cont
.
end
(),
query_
,
user_
,
query_id_
,
ip_address_
,
max_memory_usage
,
priorities
.
insert
(
priority
))));
if
(
!
query_id_
.
empty
())
user_to_queries
[
user_
][
query_id_
]
=
&
res
->
get
();
}
return
res
;
}
ProcessListEntry
::~
ProcessListEntry
()
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
parent
.
mutex
);
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
if
(
!
it
->
is_cancelled
&&
!
it
->
query_id
.
empty
())
{
ProcessList
::
UserToQueries
::
iterator
queries
=
parent
.
user_to_queries
.
find
(
it
->
user
);
if
(
queries
!=
parent
.
user_to_queries
.
end
())
{
ProcessList
::
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
it
->
query_id
);
if
(
element
!=
queries
->
second
.
end
())
queries
->
second
.
erase
(
element
);
}
}
parent
.
cont
.
erase
(
it
);
--
parent
.
cur_size
;
parent
.
have_space
.
signal
();
}
}
dbms/src/Interpreters/executeQuery.cpp
浏览文件 @
6f3d8f0e
...
...
@@ -14,6 +14,7 @@
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/InterpreterFactory.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
...
...
@@ -88,11 +89,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ProcessList
::
EntryPtr
process_list_entry
;
if
(
!
internal
&&
nullptr
==
typeid_cast
<
const
ASTShowProcesslistQuery
*>
(
&*
ast
))
{
const
Settings
&
settings
=
context
.
getSettingsRef
();
process_list_entry
=
context
.
getProcessList
().
insert
(
query
,
context
.
getUser
(),
context
.
getCurrentQueryId
(),
context
.
getIPAddress
(),
context
.
getSettingsRef
().
limits
.
max_memory_usage
,
context
.
getSettingsRef
().
queue_max_wait_ms
.
totalMilliseconds
(),
context
.
getSettingsRef
().
replace_running_query
);
settings
.
limits
.
max_memory_usage
,
settings
.
queue_max_wait_ms
.
totalMilliseconds
(),
settings
.
replace_running_query
,
settings
.
priority
);
context
.
setProcessListElement
(
&
process_list_entry
->
get
());
}
...
...
dbms/src/Server/Server.cpp
浏览文件 @
6f3d8f0e
...
...
@@ -20,6 +20,7 @@
#include <DB/Common/Macros.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemTables.h>
#include <DB/Storages/StorageSystemParts.h>
...
...
dbms/src/Storages/StorageSystemProcesses.cpp
浏览文件 @
6f3d8f0e
...
...
@@ -2,6 +2,7 @@
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Storages/StorageSystemProcesses.h>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录