Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
a2d9ab6e
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a2d9ab6e
编写于
6月 22, 2015
作者:
E
Evgeniy Gatov
浏览文件
操作
浏览文件
下载
差异文件
Merge
上级
1628366a
ab945e6d
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
313 addition
and
149 deletion
+313
-149
dbms/include/DB/Columns/ColumnReplicated.h
dbms/include/DB/Columns/ColumnReplicated.h
+0
-25
dbms/include/DB/Columns/IColumnDummy.h
dbms/include/DB/Columns/IColumnDummy.h
+1
-6
dbms/include/DB/DataStreams/BlockIO.h
dbms/include/DB/DataStreams/BlockIO.h
+5
-2
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
+3
-3
dbms/include/DB/Functions/FunctionsHigherOrder.h
dbms/include/DB/Functions/FunctionsHigherOrder.h
+1
-2
dbms/include/DB/Functions/FunctionsMiscellaneous.h
dbms/include/DB/Functions/FunctionsMiscellaneous.h
+8
-11
dbms/include/DB/Interpreters/ProcessList.h
dbms/include/DB/Interpreters/ProcessList.h
+46
-87
dbms/include/DB/Interpreters/QueryPriorities.h
dbms/include/DB/Interpreters/QueryPriorities.h
+117
-0
dbms/include/DB/Interpreters/Settings.h
dbms/include/DB/Interpreters/Settings.h
+3
-0
dbms/src/DataStreams/BlockIO.cpp
dbms/src/DataStreams/BlockIO.cpp
+9
-0
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
+2
-1
dbms/src/Functions/FunctionsMiscellaneous.cpp
dbms/src/Functions/FunctionsMiscellaneous.cpp
+1
-0
dbms/src/Interpreters/ProcessList.cpp
dbms/src/Interpreters/ProcessList.cpp
+72
-0
dbms/src/Interpreters/executeQuery.cpp
dbms/src/Interpreters/executeQuery.cpp
+7
-3
dbms/src/Server/Server.cpp
dbms/src/Server/Server.cpp
+1
-0
dbms/src/Storages/StorageSystemProcesses.cpp
dbms/src/Storages/StorageSystemProcesses.cpp
+1
-0
dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh
...s/queries/0_stateless/00177_inserts_through_http_parts.sh
+9
-9
dbms/tests/queries/0_stateless/00178_function_replicate.reference
...ts/queries/0_stateless/00178_function_replicate.reference
+10
-0
dbms/tests/queries/0_stateless/00178_function_replicate.sql
dbms/tests/queries/0_stateless/00178_function_replicate.sql
+9
-0
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference
...0179_lambdas_with_common_expressions_and_filter.reference
+5
-0
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql
...less/00179_lambdas_with_common_expressions_and_filter.sql
+3
-0
未找到文件。
dbms/include/DB/Columns/ColumnReplicated.h
已删除
100644 → 0
浏览文件 @
1628366a
#pragma once
#include <DB/Columns/IColumnDummy.h>
namespace
DB
{
/** Содержит промежуточные данные для вычисления выражений в функциях высшего порядка.
* Это - вложенный столбец произвольного размера.
* Сам ColumnReplicated притворяется, как столбец указанного в конструкторе размера.
*/
class
ColumnReplicated
final
:
public
IColumnDummy
{
public:
ColumnReplicated
(
size_t
s_
,
ColumnPtr
nested_
)
:
IColumnDummy
(
s_
),
nested
(
nested_
)
{}
std
::
string
getName
()
const
override
{
return
"ColumnReplicated"
;
}
ColumnPtr
cloneDummy
(
size_t
s_
)
const
override
{
return
new
ColumnReplicated
(
s_
,
nested
);
}
ColumnPtr
&
getData
()
{
return
nested
;
}
private:
ColumnPtr
nested
;
};
}
dbms/include/DB/Columns/IColumnDummy.h
浏览文件 @
a2d9ab6e
...
...
@@ -41,12 +41,7 @@ public:
ColumnPtr
filter
(
const
Filter
&
filt
)
const
override
{
size_t
new_size
=
0
;
for
(
Filter
::
const_iterator
it
=
filt
.
begin
();
it
!=
filt
.
end
();
++
it
)
if
(
*
it
)
++
new_size
;
return
cloneDummy
(
new_size
);
return
cloneDummy
(
countBytesInFilter
(
filt
));
}
ColumnPtr
permute
(
const
Permutation
&
perm
,
size_t
limit
)
const
override
...
...
dbms/include/DB/DataStreams/BlockIO.h
浏览文件 @
a2d9ab6e
...
...
@@ -2,12 +2,13 @@
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Interpreters/ProcessList.h>
namespace
DB
{
class
ProcessListEntry
;
struct
BlockIO
{
/** process_list_entry должен уничтожаться позже, чем in и out,
...
...
@@ -15,7 +16,7 @@ struct BlockIO
* (MemoryTracker * current_memory_tracker),
* которая может использоваться до уничтожения in и out.
*/
ProcessList
::
EntryPtr
process_list_entry
;
std
::
shared_ptr
<
ProcessListEntry
>
process_list_entry
;
BlockInputStreamPtr
in
;
BlockOutputStreamPtr
out
;
...
...
@@ -38,6 +39,8 @@ struct BlockIO
return
*
this
;
}
~
BlockIO
();
};
}
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
浏览文件 @
a2d9ab6e
...
...
@@ -3,7 +3,6 @@
#include <DB/Core/Progress.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
#include <DB/DataStreams/IBlockInputStream.h>
...
...
@@ -14,6 +13,7 @@ namespace DB
{
class
QuotaForIntervals
;
class
ProcessListElement
;
/** Смотрит за тем, как работает источник блоков.
...
...
@@ -82,7 +82,7 @@ public:
* На основе этой информации будет проверяться квота, и некоторые ограничения.
* Также эта информация будет доступна в запросе SHOW PROCESSLIST.
*/
void
setProcessListElement
(
ProcessList
::
Element
*
elem
);
void
setProcessListElement
(
ProcessListElement
*
elem
);
/** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать.
*/
...
...
@@ -154,7 +154,7 @@ protected:
BlockStreamProfileInfo
info
;
std
::
atomic
<
bool
>
is_cancelled
{
false
};
ProgressCallback
progress_callback
;
ProcessList
::
Element
*
process_list_elem
=
nullptr
;
ProcessListElement
*
process_list_elem
=
nullptr
;
bool
enabled_extremes
=
false
;
...
...
dbms/include/DB/Functions/FunctionsHigherOrder.h
浏览文件 @
a2d9ab6e
...
...
@@ -5,7 +5,6 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnReplicated.h>
#include <DB/Columns/ColumnExpression.h>
#include <DB/Functions/IFunction.h>
...
...
@@ -580,7 +579,7 @@ public:
ColumnWithNameAndType
replicated_column
=
block
.
getByPosition
(
prerequisites
[
prerequisite_index
]);
replicated_column
.
name
=
name
;
replicated_column
.
column
=
typeid_cast
<
Column
Replicated
&>
(
*
replicated_column
.
column
).
getData
();
replicated_column
.
column
=
typeid_cast
<
Column
Array
&>
(
*
replicated_column
.
column
).
getDataPtr
();
temp_block
.
insert
(
replicated_column
);
++
prerequisite_index
;
...
...
dbms/include/DB/Functions/FunctionsMiscellaneous.h
浏览文件 @
a2d9ab6e
...
...
@@ -20,7 +20,6 @@
#include <DB/Columns/ColumnSet.h>
#include <DB/Columns/ColumnTuple.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnReplicated.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Common/UnicodeBar.h>
#include <DB/Functions/IFunction.h>
...
...
@@ -51,9 +50,8 @@ namespace DB
* arrayJoin(arr) - особая функция - выполнить её напрямую нельзя;
* используется только чтобы получить тип результата соответствующего выражения.
*
* replicate(x, arr) - копирует x столько раз, сколько элементов в массиве arr;
* например: replicate(1, ['a', 'b', 'c']) = 1, 1, 1.
* не предназначена для пользователя, а используется только как prerequisites для функций высшего порядка.
* replicate(x, arr) - создаёт массив такого же размера как arr, все элементы которого равны x;
* например: replicate(1, ['a', 'b', 'c']) = [1, 1, 1].
*
* sleep(n) - спит n секунд каждый блок.
*
...
...
@@ -570,18 +568,15 @@ public:
};
/** Размножает столбец (первый аргумент) по количеству элементов в массиве (втором аргументе).
* Не предназначена для внешнего использования.
* Так как возвращаемый столбец будет иметь несовпадающий размер с исходными,
* то результат не может быть потом использован в том же блоке, что и аргументы.
/** Создаёт массив, размножая столбец (первый аргумент) по количеству элементов в массиве (втором аргументе).
* Используется только в качестве prerequisites для функций высшего порядка.
*/
class
FunctionReplicate
:
public
IFunction
{
public:
static
constexpr
auto
name
=
"replicate"
;
static
IFunction
*
create
(
const
Context
&
context
)
{
return
new
FunctionReplicate
;
}
/// Получить имя функции.
String
getName
()
const
{
...
...
@@ -600,7 +595,7 @@ class FunctionReplicate : public IFunction
if
(
!
array_type
)
throw
Exception
(
"Second argument for function "
+
getName
()
+
" must be array."
,
ErrorCodes
::
ILLEGAL_TYPE_OF_ARGUMENT
);
return
arguments
[
0
]
->
clone
(
);
return
new
DataTypeArray
(
arguments
[
0
]
->
clone
()
);
}
/// Выполнить функцию над блоком.
...
...
@@ -620,7 +615,9 @@ class FunctionReplicate : public IFunction
array_column
=
typeid_cast
<
ColumnArray
*>
(
&*
temp_column
);
}
block
.
getByPosition
(
result
).
column
=
new
ColumnReplicated
(
first_column
->
size
(),
first_column
->
replicate
(
array_column
->
getOffsets
()));
block
.
getByPosition
(
result
).
column
=
new
ColumnArray
(
first_column
->
replicate
(
array_column
->
getOffsets
()),
array_column
->
getOffsetsColumn
());
}
};
...
...
dbms/include/DB/Interpreters/ProcessList.h
浏览文件 @
a2d9ab6e
...
...
@@ -2,7 +2,7 @@
#include <map>
#include <list>
#include <
Poco/SharedPtr.h
>
#include <
memory
>
#include <Poco/Mutex.h>
#include <Poco/Condition.h>
#include <Poco/Net/IPAddress.h>
...
...
@@ -13,6 +13,7 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
namespace
DB
...
...
@@ -36,13 +37,16 @@ struct ProcessListElement
MemoryTracker
memory_tracker
;
QueryPriorities
::
Handle
priority_handle
;
bool
is_cancelled
=
false
;
ProcessListElement
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
)
:
query
(
query_
),
user
(
user_
),
query_id
(
query_id_
),
ip_address
(
ip_address_
),
memory_tracker
(
max_memory_usage
)
size_t
max_memory_usage
,
QueryPriorities
::
Handle
&&
priority_handle_
)
:
query
(
query_
),
user
(
user_
),
query_id
(
query_id_
),
ip_address
(
ip_address_
),
memory_tracker
(
max_memory_usage
),
priority_handle
(
std
::
move
(
priority_handle_
))
{
current_memory_tracker
=
&
memory_tracker
;
}
...
...
@@ -55,126 +59,81 @@ struct ProcessListElement
bool
update
(
const
Progress
&
value
)
{
progress
.
incrementPiecewiseAtomically
(
value
);
if
(
priority_handle
)
priority_handle
->
waitIfNeed
(
std
::
chrono
::
seconds
(
1
));
/// NOTE Можно сделать настраиваемым таймаут.
return
!
is_cancelled
;
}
};
class
ProcessList
;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class
ProcessListEntry
{
private:
using
Container
=
std
::
list
<
ProcessListElement
>
;
ProcessList
&
parent
;
Container
::
iterator
it
;
public:
ProcessListEntry
(
ProcessList
&
parent_
,
Container
::
iterator
it_
)
:
parent
(
parent_
),
it
(
it_
)
{}
~
ProcessListEntry
();
ProcessListElement
*
operator
->
()
{
return
&*
it
;
}
const
ProcessListElement
*
operator
->
()
const
{
return
&*
it
;
}
ProcessListElement
&
get
()
{
return
*
it
;
}
const
ProcessListElement
&
get
()
const
{
return
*
it
;
}
};
class
ProcessList
{
friend
class
Entry
;
friend
class
ProcessList
Entry
;
public:
using
Element
=
ProcessListElement
;
using
Entry
=
ProcessListEntry
;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
typedef
std
::
list
<
Element
>
Containter
;
using
Container
=
std
::
list
<
Element
>
;
/// Query_id -> Element *
typedef
std
::
unordered_map
<
String
,
Element
*>
QueryToElement
;
using
QueryToElement
=
std
::
unordered_map
<
String
,
Element
*>
;
/// User -> Query_id -> Element *
typedef
std
::
unordered_map
<
String
,
QueryToElement
>
UserToQueries
;
using
UserToQueries
=
std
::
unordered_map
<
String
,
QueryToElement
>
;
private:
mutable
Poco
::
FastMutex
mutex
;
mutable
Poco
::
Condition
have_space
;
/// Количество одновременно выполняющихся запросов стало меньше максимального.
Contain
t
er
cont
;
Container
cont
;
size_t
cur_size
;
/// В C++03 std::list::size не O(1).
size_t
max_size
;
/// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
UserToQueries
user_to_queries
;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class
Entry
{
private:
ProcessList
&
parent
;
Containter
::
iterator
it
;
public:
Entry
(
ProcessList
&
parent_
,
Containter
::
iterator
it_
)
:
parent
(
parent_
),
it
(
it_
)
{}
~
Entry
()
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
parent
.
mutex
);
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
if
(
!
it
->
is_cancelled
&&
!
it
->
query_id
.
empty
())
{
UserToQueries
::
iterator
queries
=
parent
.
user_to_queries
.
find
(
it
->
user
);
if
(
queries
!=
parent
.
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
it
->
query_id
);
if
(
element
!=
queries
->
second
.
end
())
queries
->
second
.
erase
(
element
);
}
}
parent
.
cont
.
erase
(
it
);
--
parent
.
cur_size
;
parent
.
have_space
.
signal
();
}
Element
*
operator
->
()
{
return
&*
it
;
}
const
Element
*
operator
->
()
const
{
return
&*
it
;
}
Element
&
get
()
{
return
*
it
;
}
const
Element
&
get
()
const
{
return
*
it
;
}
};
QueryPriorities
priorities
;
public:
ProcessList
(
size_t
max_size_
=
0
)
:
cur_size
(
0
),
max_size
(
max_size_
)
{}
typedef
Poco
::
SharedPtr
<
Entry
>
EntryPtr
;
typedef
std
::
shared_ptr
<
ProcessList
Entry
>
EntryPtr
;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
*/
EntryPtr
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
=
0
,
size_t
max_wait_milliseconds
=
DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS
,
bool
replace_running_query
=
false
)
{
EntryPtr
res
;
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
if
(
max_size
&&
cur_size
>=
max_size
&&
(
!
max_wait_milliseconds
||
!
have_space
.
tryWait
(
mutex
,
max_wait_milliseconds
)))
throw
Exception
(
"Too much simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MUCH_SIMULTANEOUS_QUERIES
);
if
(
!
query_id_
.
empty
())
{
UserToQueries
::
iterator
queries
=
user_to_queries
.
find
(
user_
);
if
(
queries
!=
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
query_id_
);
if
(
element
!=
queries
->
second
.
end
())
{
if
(
!
replace_running_query
)
throw
Exception
(
"Query with id = "
+
query_id_
+
" is already running."
,
ErrorCodes
::
QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING
);
element
->
second
->
is_cancelled
=
true
;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries
->
second
.
erase
(
element
);
}
}
}
++
cur_size
;
res
=
new
Entry
(
*
this
,
cont
.
emplace
(
cont
.
end
(),
query_
,
user_
,
query_id_
,
ip_address_
,
max_memory_usage
));
if
(
!
query_id_
.
empty
())
user_to_queries
[
user_
][
query_id_
]
=
&
res
->
get
();
}
return
res
;
}
size_t
max_memory_usage
,
size_t
max_wait_milliseconds
,
bool
replace_running_query
,
QueryPriorities
::
Priority
priority
);
/// Количество одновременно выполняющихся запросов.
size_t
size
()
const
{
return
cur_size
;
}
/// Получить текущее состояние (копию) списка запросов.
Contain
t
er
get
()
const
Container
get
()
const
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
return
cont
;
...
...
dbms/include/DB/Interpreters/QueryPriorities.h
0 → 100644
浏览文件 @
a2d9ab6e
#pragma once
#include <map>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <chrono>
/** Реализует приоритеты запросов.
* Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос.
*
* Величина приоритета - целое число, чем меньше - тем больше приоритет.
*
* Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда,
* не зависят от других запросов и не влияют на другие запросы.
* То есть 0 означает - не использовать приоритеты.
*
* NOTE Возможности сделать лучше:
* - реализовать ограничение на максимальное количество запросов с таким приоритетом.
*/
class
QueryPriorities
{
public:
using
Priority
=
int
;
private:
friend
struct
Handle
;
using
Count
=
int
;
/// Количество выполняющихся сейчас запросов с заданным приоритетом.
using
Container
=
std
::
map
<
Priority
,
Count
>
;
std
::
mutex
mutex
;
std
::
condition_variable
condvar
;
Container
container
;
/** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут.
* Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут.
*/
template
<
typename
Duration
>
bool
waitIfNeed
(
Priority
priority
,
Duration
timeout
)
{
if
(
0
==
priority
)
return
true
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
while
(
true
)
{
/// Если ли хотя бы один более приоритетный запрос?
bool
found
=
false
;
for
(
const
auto
&
value
:
container
)
{
if
(
value
.
first
>=
priority
)
break
;
if
(
value
.
second
>
0
)
{
found
=
true
;
break
;
}
}
if
(
!
found
)
return
true
;
if
(
std
::
cv_status
::
timeout
==
condvar
.
wait_for
(
lock
,
timeout
))
return
false
;
}
}
public:
struct
HandleImpl
{
private:
QueryPriorities
&
parent
;
QueryPriorities
::
Container
::
value_type
&
value
;
public:
HandleImpl
(
QueryPriorities
&
parent_
,
QueryPriorities
::
Container
::
value_type
&
value_
)
:
parent
(
parent_
),
value
(
value_
)
{}
~
HandleImpl
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parent
.
mutex
);
--
value
.
second
;
}
parent
.
condvar
.
notify_all
();
}
template
<
typename
Duration
>
bool
waitIfNeed
(
Duration
timeout
)
{
return
parent
.
waitIfNeed
(
value
.
first
,
timeout
);
}
};
using
Handle
=
std
::
shared_ptr
<
HandleImpl
>
;
/** Зарегистрировать, что запрос с заданным приоритетом выполняется.
* Возвращается объект, в деструкторе которого, запись о запросе удаляется.
*/
Handle
insert
(
Priority
priority
)
{
if
(
0
==
priority
)
return
{};
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
auto
it
=
container
.
emplace
(
priority
,
0
).
first
;
++
it
->
second
;
return
std
::
make_shared
<
HandleImpl
>
(
*
this
,
*
it
);
}
};
dbms/include/DB/Interpreters/Settings.h
浏览文件 @
a2d9ab6e
...
...
@@ -132,6 +132,9 @@ struct Settings
\
/** Позволяет выбирать метод сжатия данных при записи */
\
M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \
\
/** Приоритет запроса. 1 - самый высокий, больше - ниже; 0 - не использовать приоритеты. */
\
M(SettingUInt64, priority, 0) \
/// Всевозможные ограничения на выполнение запроса.
Limits
limits
;
...
...
dbms/src/DataStreams/BlockIO.cpp
0 → 100644
浏览文件 @
a2d9ab6e
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockIO.h>
namespace
DB
{
BlockIO
::~
BlockIO
()
=
default
;
}
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
浏览文件 @
a2d9ab6e
...
...
@@ -5,6 +5,7 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
...
...
@@ -320,7 +321,7 @@ void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback)
}
void
IProfilingBlockInputStream
::
setProcessListElement
(
ProcessList
::
Element
*
elem
)
void
IProfilingBlockInputStream
::
setProcessListElement
(
ProcessListElement
*
elem
)
{
process_list_elem
=
elem
;
...
...
dbms/src/Functions/FunctionsMiscellaneous.cpp
浏览文件 @
a2d9ab6e
...
...
@@ -326,6 +326,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
factory
.
registerFunction
<
FunctionMaterialize
>
();
factory
.
registerFunction
<
FunctionIgnore
>
();
factory
.
registerFunction
<
FunctionArrayJoin
>
();
factory
.
registerFunction
<
FunctionReplicate
>
();
factory
.
registerFunction
<
FunctionBar
>
();
factory
.
registerFunction
<
FunctionTuple
>
();
...
...
dbms/src/Interpreters/ProcessList.cpp
0 → 100644
浏览文件 @
a2d9ab6e
#include <DB/Interpreters/ProcessList.h>
namespace
DB
{
ProcessList
::
EntryPtr
ProcessList
::
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
,
size_t
max_wait_milliseconds
,
bool
replace_running_query
,
QueryPriorities
::
Priority
priority
)
{
EntryPtr
res
;
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
if
(
max_size
&&
cur_size
>=
max_size
&&
(
!
max_wait_milliseconds
||
!
have_space
.
tryWait
(
mutex
,
max_wait_milliseconds
)))
throw
Exception
(
"Too much simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MUCH_SIMULTANEOUS_QUERIES
);
if
(
!
query_id_
.
empty
())
{
UserToQueries
::
iterator
queries
=
user_to_queries
.
find
(
user_
);
if
(
queries
!=
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
query_id_
);
if
(
element
!=
queries
->
second
.
end
())
{
if
(
!
replace_running_query
)
throw
Exception
(
"Query with id = "
+
query_id_
+
" is already running."
,
ErrorCodes
::
QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING
);
element
->
second
->
is_cancelled
=
true
;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries
->
second
.
erase
(
element
);
}
}
}
++
cur_size
;
res
.
reset
(
new
Entry
(
*
this
,
cont
.
emplace
(
cont
.
end
(),
query_
,
user_
,
query_id_
,
ip_address_
,
max_memory_usage
,
priorities
.
insert
(
priority
))));
if
(
!
query_id_
.
empty
())
user_to_queries
[
user_
][
query_id_
]
=
&
res
->
get
();
}
return
res
;
}
ProcessListEntry
::~
ProcessListEntry
()
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
parent
.
mutex
);
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
if
(
!
it
->
is_cancelled
&&
!
it
->
query_id
.
empty
())
{
ProcessList
::
UserToQueries
::
iterator
queries
=
parent
.
user_to_queries
.
find
(
it
->
user
);
if
(
queries
!=
parent
.
user_to_queries
.
end
())
{
ProcessList
::
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
it
->
query_id
);
if
(
element
!=
queries
->
second
.
end
())
queries
->
second
.
erase
(
element
);
}
}
parent
.
cont
.
erase
(
it
);
--
parent
.
cur_size
;
parent
.
have_space
.
signal
();
}
}
dbms/src/Interpreters/executeQuery.cpp
浏览文件 @
a2d9ab6e
...
...
@@ -14,6 +14,7 @@
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/InterpreterFactory.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
...
...
@@ -88,11 +89,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ProcessList
::
EntryPtr
process_list_entry
;
if
(
!
internal
&&
nullptr
==
typeid_cast
<
const
ASTShowProcesslistQuery
*>
(
&*
ast
))
{
const
Settings
&
settings
=
context
.
getSettingsRef
();
process_list_entry
=
context
.
getProcessList
().
insert
(
query
,
context
.
getUser
(),
context
.
getCurrentQueryId
(),
context
.
getIPAddress
(),
context
.
getSettingsRef
().
limits
.
max_memory_usage
,
context
.
getSettingsRef
().
queue_max_wait_ms
.
totalMilliseconds
(),
context
.
getSettingsRef
().
replace_running_query
);
settings
.
limits
.
max_memory_usage
,
settings
.
queue_max_wait_ms
.
totalMilliseconds
(),
settings
.
replace_running_query
,
settings
.
priority
);
context
.
setProcessListElement
(
&
process_list_entry
->
get
());
}
...
...
dbms/src/Server/Server.cpp
浏览文件 @
a2d9ab6e
...
...
@@ -20,6 +20,7 @@
#include <DB/Common/Macros.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemTables.h>
#include <DB/Storages/StorageSystemParts.h>
...
...
dbms/src/Storages/StorageSystemProcesses.cpp
浏览文件 @
a2d9ab6e
...
...
@@ -2,6 +2,7 @@
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Storages/StorageSystemProcesses.h>
...
...
dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh
浏览文件 @
a2d9ab6e
#!/bin/bash
curl
'http://localhost:8123/?query=DROP+TABLE'
-d
'IF EXISTS test.insert'
curl
'http://localhost:8123/?query=CREATE'
-d
'TABLE test.insert (x UInt8) ENGINE = Memory'
curl
'http://localhost:8123/'
-d
'INSERT INTO test.insert VALUES (1),(2)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES'
-d
'(3),(4)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert'
-d
'VALUES (5),(6)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(7)'
-d
',(8)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(9),(10)'
-d
' '
curl
'http://localhost:8123/'
-d
'SELECT x FROM test.insert ORDER BY x'
curl
'http://localhost:8123/?query=DROP+TABLE'
-d
'test.insert'
curl
-sS
'http://localhost:8123/?query=DROP+TABLE'
-d
'IF EXISTS test.insert'
curl
-sS
'http://localhost:8123/?query=CREATE'
-d
'TABLE test.insert (x UInt8) ENGINE = Memory'
curl
-sS
'http://localhost:8123/'
-d
'INSERT INTO test.insert VALUES (1),(2)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES'
-d
'(3),(4)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert'
-d
'VALUES (5),(6)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(7)'
-d
',(8)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(9),(10)'
-d
' '
curl
-sS
'http://localhost:8123/'
-d
'SELECT x FROM test.insert ORDER BY x'
curl
-sS
'http://localhost:8123/?query=DROP+TABLE'
-d
'test.insert'
dbms/tests/queries/0_stateless/00178_function_replicate.reference
0 → 100644
浏览文件 @
a2d9ab6e
0 [] [] [] [] []
1 [0] [1] ['1'] [[0]] [['0']]
2 [0,1] [2,2] ['2','2'] [[0,1],[0,1]] [['0','1'],['0','1']]
3 [0,1,2] [3,3,3] ['3','3','3'] [[0,1,2],[0,1,2],[0,1,2]] [['0','1','2'],['0','1','2'],['0','1','2']]
4 [0,1,2,3] [4,4,4,4] ['4','4','4','4'] [[0,1,2,3],[0,1,2,3],[0,1,2,3],[0,1,2,3]] [['0','1','2','3'],['0','1','2','3'],['0','1','2','3'],['0','1','2','3']]
5 [0,1,2,3,4] [5,5,5,5,5] ['5','5','5','5','5'] [[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4]] [['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4']]
6 [0,1,2,3,4,5] [6,6,6,6,6,6] ['6','6','6','6','6','6'] [[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5]] [['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5']]
7 [0,1,2,3,4,5,6] [7,7,7,7,7,7,7] ['7','7','7','7','7','7','7'] [[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6]] [['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6']]
8 [0,1,2,3,4,5,6,7] [8,8,8,8,8,8,8,8] ['8','8','8','8','8','8','8','8'] [[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7]] [['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7']]
9 [0,1,2,3,4,5,6,7,8] [9,9,9,9,9,9,9,9,9] ['9','9','9','9','9','9','9','9','9'] [[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8]] [['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8']]
dbms/tests/queries/0_stateless/00178_function_replicate.sql
0 → 100644
浏览文件 @
a2d9ab6e
SELECT
number
,
range
(
number
)
AS
arr
,
replicate
(
number
,
arr
),
replicate
(
toString
(
number
),
arr
),
replicate
(
range
(
number
),
arr
),
replicate
(
arrayMap
(
x
->
toString
(
x
),
range
(
number
)),
arr
)
FROM
system
.
numbers
LIMIT
10
;
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference
0 → 100644
浏览文件 @
a2d9ab6e
[0]
[0,1,2]
[0,1,2,3,4]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7,8]
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql
0 → 100644
浏览文件 @
a2d9ab6e
SELECT
arrayMap
(
x
->
number
!=
-
1
?
x
:
0
,
arr
)
FROM
(
SELECT
number
,
range
(
number
)
AS
arr
FROM
system
.
numbers
LIMIT
10
)
WHERE
number
%
2
=
1
AND
arrayExists
(
x
->
number
!=
-
1
,
arr
);
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录