Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
389958d7
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
389958d7
编写于
6月 23, 2015
作者:
E
Evgeniy Gatov
浏览文件
操作
浏览文件
下载
差异文件
Merge
上级
0c964bf1
8ab76848
变更
28
隐藏空白更改
内联
并排
Showing
28 changed file
with
606 addition
and
175 deletion
+606
-175
dbms/include/DB/Columns/ColumnReplicated.h
dbms/include/DB/Columns/ColumnReplicated.h
+0
-25
dbms/include/DB/Columns/IColumnDummy.h
dbms/include/DB/Columns/IColumnDummy.h
+1
-6
dbms/include/DB/DataStreams/BlockIO.h
dbms/include/DB/DataStreams/BlockIO.h
+5
-2
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
+3
-3
dbms/include/DB/Functions/FunctionsHigherOrder.h
dbms/include/DB/Functions/FunctionsHigherOrder.h
+1
-2
dbms/include/DB/Functions/FunctionsMiscellaneous.h
dbms/include/DB/Functions/FunctionsMiscellaneous.h
+8
-11
dbms/include/DB/Interpreters/ProcessList.h
dbms/include/DB/Interpreters/ProcessList.h
+46
-87
dbms/include/DB/Interpreters/QueryLog.h
dbms/include/DB/Interpreters/QueryLog.h
+245
-0
dbms/include/DB/Interpreters/QueryPriorities.h
dbms/include/DB/Interpreters/QueryPriorities.h
+117
-0
dbms/include/DB/Interpreters/Settings.h
dbms/include/DB/Interpreters/Settings.h
+3
-0
dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h
dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h
+0
-4
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
+1
-1
dbms/src/DataStreams/BlockIO.cpp
dbms/src/DataStreams/BlockIO.cpp
+9
-0
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
+2
-1
dbms/src/Functions/FunctionsMiscellaneous.cpp
dbms/src/Functions/FunctionsMiscellaneous.cpp
+1
-0
dbms/src/Interpreters/ProcessList.cpp
dbms/src/Interpreters/ProcessList.cpp
+72
-0
dbms/src/Interpreters/executeQuery.cpp
dbms/src/Interpreters/executeQuery.cpp
+7
-3
dbms/src/Parsers/ParserAlterQuery.cpp
dbms/src/Parsers/ParserAlterQuery.cpp
+6
-0
dbms/src/Server/Server.cpp
dbms/src/Server/Server.cpp
+1
-0
dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
+11
-11
dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp
dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp
+23
-0
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+7
-10
dbms/src/Storages/StorageSystemProcesses.cpp
dbms/src/Storages/StorageSystemProcesses.cpp
+1
-0
dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh
...s/queries/0_stateless/00177_inserts_through_http_parts.sh
+9
-9
dbms/tests/queries/0_stateless/00178_function_replicate.reference
...ts/queries/0_stateless/00178_function_replicate.reference
+10
-0
dbms/tests/queries/0_stateless/00178_function_replicate.sql
dbms/tests/queries/0_stateless/00178_function_replicate.sql
+9
-0
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference
...0179_lambdas_with_common_expressions_and_filter.reference
+5
-0
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql
...less/00179_lambdas_with_common_expressions_and_filter.sql
+3
-0
未找到文件。
dbms/include/DB/Columns/ColumnReplicated.h
已删除
100644 → 0
浏览文件 @
0c964bf1
#pragma once
#include <DB/Columns/IColumnDummy.h>
namespace
DB
{
/** Содержит промежуточные данные для вычисления выражений в функциях высшего порядка.
* Это - вложенный столбец произвольного размера.
* Сам ColumnReplicated притворяется, как столбец указанного в конструкторе размера.
*/
class
ColumnReplicated
final
:
public
IColumnDummy
{
public:
ColumnReplicated
(
size_t
s_
,
ColumnPtr
nested_
)
:
IColumnDummy
(
s_
),
nested
(
nested_
)
{}
std
::
string
getName
()
const
override
{
return
"ColumnReplicated"
;
}
ColumnPtr
cloneDummy
(
size_t
s_
)
const
override
{
return
new
ColumnReplicated
(
s_
,
nested
);
}
ColumnPtr
&
getData
()
{
return
nested
;
}
private:
ColumnPtr
nested
;
};
}
dbms/include/DB/Columns/IColumnDummy.h
浏览文件 @
389958d7
...
@@ -41,12 +41,7 @@ public:
...
@@ -41,12 +41,7 @@ public:
ColumnPtr
filter
(
const
Filter
&
filt
)
const
override
ColumnPtr
filter
(
const
Filter
&
filt
)
const
override
{
{
size_t
new_size
=
0
;
return
cloneDummy
(
countBytesInFilter
(
filt
));
for
(
Filter
::
const_iterator
it
=
filt
.
begin
();
it
!=
filt
.
end
();
++
it
)
if
(
*
it
)
++
new_size
;
return
cloneDummy
(
new_size
);
}
}
ColumnPtr
permute
(
const
Permutation
&
perm
,
size_t
limit
)
const
override
ColumnPtr
permute
(
const
Permutation
&
perm
,
size_t
limit
)
const
override
...
...
dbms/include/DB/DataStreams/BlockIO.h
浏览文件 @
389958d7
...
@@ -2,12 +2,13 @@
...
@@ -2,12 +2,13 @@
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Interpreters/ProcessList.h>
namespace
DB
namespace
DB
{
{
class
ProcessListEntry
;
struct
BlockIO
struct
BlockIO
{
{
/** process_list_entry должен уничтожаться позже, чем in и out,
/** process_list_entry должен уничтожаться позже, чем in и out,
...
@@ -15,7 +16,7 @@ struct BlockIO
...
@@ -15,7 +16,7 @@ struct BlockIO
* (MemoryTracker * current_memory_tracker),
* (MemoryTracker * current_memory_tracker),
* которая может использоваться до уничтожения in и out.
* которая может использоваться до уничтожения in и out.
*/
*/
ProcessList
::
EntryPtr
process_list_entry
;
std
::
shared_ptr
<
ProcessListEntry
>
process_list_entry
;
BlockInputStreamPtr
in
;
BlockInputStreamPtr
in
;
BlockOutputStreamPtr
out
;
BlockOutputStreamPtr
out
;
...
@@ -38,6 +39,8 @@ struct BlockIO
...
@@ -38,6 +39,8 @@ struct BlockIO
return
*
this
;
return
*
this
;
}
}
~
BlockIO
();
};
};
}
}
dbms/include/DB/DataStreams/IProfilingBlockInputStream.h
浏览文件 @
389958d7
...
@@ -3,7 +3,6 @@
...
@@ -3,7 +3,6 @@
#include <DB/Core/Progress.h>
#include <DB/Core/Progress.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockInputStream.h>
...
@@ -14,6 +13,7 @@ namespace DB
...
@@ -14,6 +13,7 @@ namespace DB
{
{
class
QuotaForIntervals
;
class
QuotaForIntervals
;
class
ProcessListElement
;
/** Смотрит за тем, как работает источник блоков.
/** Смотрит за тем, как работает источник блоков.
...
@@ -82,7 +82,7 @@ public:
...
@@ -82,7 +82,7 @@ public:
* На основе этой информации будет проверяться квота, и некоторые ограничения.
* На основе этой информации будет проверяться квота, и некоторые ограничения.
* Также эта информация будет доступна в запросе SHOW PROCESSLIST.
* Также эта информация будет доступна в запросе SHOW PROCESSLIST.
*/
*/
void
setProcessListElement
(
ProcessList
::
Element
*
elem
);
void
setProcessListElement
(
ProcessListElement
*
elem
);
/** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать.
/** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать.
*/
*/
...
@@ -154,7 +154,7 @@ protected:
...
@@ -154,7 +154,7 @@ protected:
BlockStreamProfileInfo
info
;
BlockStreamProfileInfo
info
;
std
::
atomic
<
bool
>
is_cancelled
{
false
};
std
::
atomic
<
bool
>
is_cancelled
{
false
};
ProgressCallback
progress_callback
;
ProgressCallback
progress_callback
;
ProcessList
::
Element
*
process_list_elem
=
nullptr
;
ProcessListElement
*
process_list_elem
=
nullptr
;
bool
enabled_extremes
=
false
;
bool
enabled_extremes
=
false
;
...
...
dbms/include/DB/Functions/FunctionsHigherOrder.h
浏览文件 @
389958d7
...
@@ -5,7 +5,6 @@
...
@@ -5,7 +5,6 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnReplicated.h>
#include <DB/Columns/ColumnExpression.h>
#include <DB/Columns/ColumnExpression.h>
#include <DB/Functions/IFunction.h>
#include <DB/Functions/IFunction.h>
...
@@ -580,7 +579,7 @@ public:
...
@@ -580,7 +579,7 @@ public:
ColumnWithNameAndType
replicated_column
=
block
.
getByPosition
(
prerequisites
[
prerequisite_index
]);
ColumnWithNameAndType
replicated_column
=
block
.
getByPosition
(
prerequisites
[
prerequisite_index
]);
replicated_column
.
name
=
name
;
replicated_column
.
name
=
name
;
replicated_column
.
column
=
typeid_cast
<
Column
Replicated
&>
(
*
replicated_column
.
column
).
getData
();
replicated_column
.
column
=
typeid_cast
<
Column
Array
&>
(
*
replicated_column
.
column
).
getDataPtr
();
temp_block
.
insert
(
replicated_column
);
temp_block
.
insert
(
replicated_column
);
++
prerequisite_index
;
++
prerequisite_index
;
...
...
dbms/include/DB/Functions/FunctionsMiscellaneous.h
浏览文件 @
389958d7
...
@@ -20,7 +20,6 @@
...
@@ -20,7 +20,6 @@
#include <DB/Columns/ColumnSet.h>
#include <DB/Columns/ColumnSet.h>
#include <DB/Columns/ColumnTuple.h>
#include <DB/Columns/ColumnTuple.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnReplicated.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Common/UnicodeBar.h>
#include <DB/Common/UnicodeBar.h>
#include <DB/Functions/IFunction.h>
#include <DB/Functions/IFunction.h>
...
@@ -51,9 +50,8 @@ namespace DB
...
@@ -51,9 +50,8 @@ namespace DB
* arrayJoin(arr) - особая функция - выполнить её напрямую нельзя;
* arrayJoin(arr) - особая функция - выполнить её напрямую нельзя;
* используется только чтобы получить тип результата соответствующего выражения.
* используется только чтобы получить тип результата соответствующего выражения.
*
*
* replicate(x, arr) - копирует x столько раз, сколько элементов в массиве arr;
* replicate(x, arr) - создаёт массив такого же размера как arr, все элементы которого равны x;
* например: replicate(1, ['a', 'b', 'c']) = 1, 1, 1.
* например: replicate(1, ['a', 'b', 'c']) = [1, 1, 1].
* не предназначена для пользователя, а используется только как prerequisites для функций высшего порядка.
*
*
* sleep(n) - спит n секунд каждый блок.
* sleep(n) - спит n секунд каждый блок.
*
*
...
@@ -570,18 +568,15 @@ public:
...
@@ -570,18 +568,15 @@ public:
};
};
/** Размножает столбец (первый аргумент) по количеству элементов в массиве (втором аргументе).
/** Создаёт массив, размножая столбец (первый аргумент) по количеству элементов в массиве (втором аргументе).
* Не предназначена для внешнего использования.
* Так как возвращаемый столбец будет иметь несовпадающий размер с исходными,
* то результат не может быть потом использован в том же блоке, что и аргументы.
* Используется только в качестве prerequisites для функций высшего порядка.
* Используется только в качестве prerequisites для функций высшего порядка.
*/
*/
class
FunctionReplicate
:
public
IFunction
class
FunctionReplicate
:
public
IFunction
{
{
public:
static
constexpr
auto
name
=
"replicate"
;
static
constexpr
auto
name
=
"replicate"
;
static
IFunction
*
create
(
const
Context
&
context
)
{
return
new
FunctionReplicate
;
}
static
IFunction
*
create
(
const
Context
&
context
)
{
return
new
FunctionReplicate
;
}
/// Получить имя функции.
/// Получить имя функции.
String
getName
()
const
String
getName
()
const
{
{
...
@@ -600,7 +595,7 @@ class FunctionReplicate : public IFunction
...
@@ -600,7 +595,7 @@ class FunctionReplicate : public IFunction
if
(
!
array_type
)
if
(
!
array_type
)
throw
Exception
(
"Second argument for function "
+
getName
()
+
" must be array."
,
ErrorCodes
::
ILLEGAL_TYPE_OF_ARGUMENT
);
throw
Exception
(
"Second argument for function "
+
getName
()
+
" must be array."
,
ErrorCodes
::
ILLEGAL_TYPE_OF_ARGUMENT
);
return
arguments
[
0
]
->
clone
(
);
return
new
DataTypeArray
(
arguments
[
0
]
->
clone
()
);
}
}
/// Выполнить функцию над блоком.
/// Выполнить функцию над блоком.
...
@@ -620,7 +615,9 @@ class FunctionReplicate : public IFunction
...
@@ -620,7 +615,9 @@ class FunctionReplicate : public IFunction
array_column
=
typeid_cast
<
ColumnArray
*>
(
&*
temp_column
);
array_column
=
typeid_cast
<
ColumnArray
*>
(
&*
temp_column
);
}
}
block
.
getByPosition
(
result
).
column
=
new
ColumnReplicated
(
first_column
->
size
(),
first_column
->
replicate
(
array_column
->
getOffsets
()));
block
.
getByPosition
(
result
).
column
=
new
ColumnArray
(
first_column
->
replicate
(
array_column
->
getOffsets
()),
array_column
->
getOffsetsColumn
());
}
}
};
};
...
...
dbms/include/DB/Interpreters/ProcessList.h
浏览文件 @
389958d7
...
@@ -2,7 +2,7 @@
...
@@ -2,7 +2,7 @@
#include <map>
#include <map>
#include <list>
#include <list>
#include <
Poco/SharedPtr.h
>
#include <
memory
>
#include <Poco/Mutex.h>
#include <Poco/Mutex.h>
#include <Poco/Condition.h>
#include <Poco/Condition.h>
#include <Poco/Net/IPAddress.h>
#include <Poco/Net/IPAddress.h>
...
@@ -13,6 +13,7 @@
...
@@ -13,6 +13,7 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
namespace
DB
namespace
DB
...
@@ -36,13 +37,16 @@ struct ProcessListElement
...
@@ -36,13 +37,16 @@ struct ProcessListElement
MemoryTracker
memory_tracker
;
MemoryTracker
memory_tracker
;
QueryPriorities
::
Handle
priority_handle
;
bool
is_cancelled
=
false
;
bool
is_cancelled
=
false
;
ProcessListElement
(
const
String
&
query_
,
const
String
&
user_
,
ProcessListElement
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
)
size_t
max_memory_usage
,
QueryPriorities
::
Handle
&&
priority_handle_
)
:
query
(
query_
),
user
(
user_
),
query_id
(
query_id_
),
ip_address
(
ip_address_
),
memory_tracker
(
max_memory_usage
)
:
query
(
query_
),
user
(
user_
),
query_id
(
query_id_
),
ip_address
(
ip_address_
),
memory_tracker
(
max_memory_usage
),
priority_handle
(
std
::
move
(
priority_handle_
))
{
{
current_memory_tracker
=
&
memory_tracker
;
current_memory_tracker
=
&
memory_tracker
;
}
}
...
@@ -55,126 +59,81 @@ struct ProcessListElement
...
@@ -55,126 +59,81 @@ struct ProcessListElement
bool
update
(
const
Progress
&
value
)
bool
update
(
const
Progress
&
value
)
{
{
progress
.
incrementPiecewiseAtomically
(
value
);
progress
.
incrementPiecewiseAtomically
(
value
);
if
(
priority_handle
)
priority_handle
->
waitIfNeed
(
std
::
chrono
::
seconds
(
1
));
/// NOTE Можно сделать настраиваемым таймаут.
return
!
is_cancelled
;
return
!
is_cancelled
;
}
}
};
};
class
ProcessList
;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class
ProcessListEntry
{
private:
using
Container
=
std
::
list
<
ProcessListElement
>
;
ProcessList
&
parent
;
Container
::
iterator
it
;
public:
ProcessListEntry
(
ProcessList
&
parent_
,
Container
::
iterator
it_
)
:
parent
(
parent_
),
it
(
it_
)
{}
~
ProcessListEntry
();
ProcessListElement
*
operator
->
()
{
return
&*
it
;
}
const
ProcessListElement
*
operator
->
()
const
{
return
&*
it
;
}
ProcessListElement
&
get
()
{
return
*
it
;
}
const
ProcessListElement
&
get
()
const
{
return
*
it
;
}
};
class
ProcessList
class
ProcessList
{
{
friend
class
Entry
;
friend
class
ProcessList
Entry
;
public:
public:
using
Element
=
ProcessListElement
;
using
Element
=
ProcessListElement
;
using
Entry
=
ProcessListEntry
;
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
/// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем.
typedef
std
::
list
<
Element
>
Containter
;
using
Container
=
std
::
list
<
Element
>
;
/// Query_id -> Element *
/// Query_id -> Element *
typedef
std
::
unordered_map
<
String
,
Element
*>
QueryToElement
;
using
QueryToElement
=
std
::
unordered_map
<
String
,
Element
*>
;
/// User -> Query_id -> Element *
/// User -> Query_id -> Element *
typedef
std
::
unordered_map
<
String
,
QueryToElement
>
UserToQueries
;
using
UserToQueries
=
std
::
unordered_map
<
String
,
QueryToElement
>
;
private:
private:
mutable
Poco
::
FastMutex
mutex
;
mutable
Poco
::
FastMutex
mutex
;
mutable
Poco
::
Condition
have_space
;
/// Количество одновременно выполняющихся запросов стало меньше максимального.
mutable
Poco
::
Condition
have_space
;
/// Количество одновременно выполняющихся запросов стало меньше максимального.
Contain
t
er
cont
;
Container
cont
;
size_t
cur_size
;
/// В C++03 std::list::size не O(1).
size_t
cur_size
;
/// В C++03 std::list::size не O(1).
size_t
max_size
;
/// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
size_t
max_size
;
/// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение.
UserToQueries
user_to_queries
;
UserToQueries
user_to_queries
;
QueryPriorities
priorities
;
/// Держит итератор на список, и удаляет элемент из списка в деструкторе.
class
Entry
{
private:
ProcessList
&
parent
;
Containter
::
iterator
it
;
public:
Entry
(
ProcessList
&
parent_
,
Containter
::
iterator
it_
)
:
parent
(
parent_
),
it
(
it_
)
{}
~
Entry
()
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
parent
.
mutex
);
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
if
(
!
it
->
is_cancelled
&&
!
it
->
query_id
.
empty
())
{
UserToQueries
::
iterator
queries
=
parent
.
user_to_queries
.
find
(
it
->
user
);
if
(
queries
!=
parent
.
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
it
->
query_id
);
if
(
element
!=
queries
->
second
.
end
())
queries
->
second
.
erase
(
element
);
}
}
parent
.
cont
.
erase
(
it
);
--
parent
.
cur_size
;
parent
.
have_space
.
signal
();
}
Element
*
operator
->
()
{
return
&*
it
;
}
const
Element
*
operator
->
()
const
{
return
&*
it
;
}
Element
&
get
()
{
return
*
it
;
}
const
Element
&
get
()
const
{
return
*
it
;
}
};
public:
public:
ProcessList
(
size_t
max_size_
=
0
)
:
cur_size
(
0
),
max_size
(
max_size_
)
{}
ProcessList
(
size_t
max_size_
=
0
)
:
cur_size
(
0
),
max_size
(
max_size_
)
{}
typedef
Poco
::
SharedPtr
<
Entry
>
EntryPtr
;
typedef
std
::
shared_ptr
<
ProcessList
Entry
>
EntryPtr
;
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
/** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени.
* Если времени не хватило - кинуть исключение.
* Если времени не хватило - кинуть исключение.
*/
*/
EntryPtr
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
EntryPtr
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
=
0
,
size_t
max_wait_milliseconds
=
DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS
,
bool
replace_running_query
=
false
)
size_t
max_memory_usage
,
size_t
max_wait_milliseconds
,
bool
replace_running_query
,
QueryPriorities
::
Priority
priority
);
{
EntryPtr
res
;
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
if
(
max_size
&&
cur_size
>=
max_size
&&
(
!
max_wait_milliseconds
||
!
have_space
.
tryWait
(
mutex
,
max_wait_milliseconds
)))
throw
Exception
(
"Too much simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MUCH_SIMULTANEOUS_QUERIES
);
if
(
!
query_id_
.
empty
())
{
UserToQueries
::
iterator
queries
=
user_to_queries
.
find
(
user_
);
if
(
queries
!=
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
query_id_
);
if
(
element
!=
queries
->
second
.
end
())
{
if
(
!
replace_running_query
)
throw
Exception
(
"Query with id = "
+
query_id_
+
" is already running."
,
ErrorCodes
::
QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING
);
element
->
second
->
is_cancelled
=
true
;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries
->
second
.
erase
(
element
);
}
}
}
++
cur_size
;
res
=
new
Entry
(
*
this
,
cont
.
emplace
(
cont
.
end
(),
query_
,
user_
,
query_id_
,
ip_address_
,
max_memory_usage
));
if
(
!
query_id_
.
empty
())
user_to_queries
[
user_
][
query_id_
]
=
&
res
->
get
();
}
return
res
;
}
/// Количество одновременно выполняющихся запросов.
/// Количество одновременно выполняющихся запросов.
size_t
size
()
const
{
return
cur_size
;
}
size_t
size
()
const
{
return
cur_size
;
}
/// Получить текущее состояние (копию) списка запросов.
/// Получить текущее состояние (копию) списка запросов.
Contain
t
er
get
()
const
Container
get
()
const
{
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
return
cont
;
return
cont
;
...
...
dbms/include/DB/Interpreters/QueryLog.h
0 → 100644
浏览文件 @
389958d7
#pragma once
#include <thread>
#include <boost/noncopyable.hpp>
#include <Poco/Net/IPAddress.h>
#include <DB/Core/Types.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
#include <DB/Interpreters/Context.h>
#include <statdaemons/Stopwatch.h>
namespace
DB
{
/** Позволяет логгировать информацию о выполнении запросов:
* - о начале выполнения запроса;
* - метрики производительности, после выполнения запроса;
* - об ошибках при выполнении запроса.
*
* Логгирование производится асинхронно. Данные передаются в очередь, откуда их читает отдельный поток.
* Этот поток записывает лог в предназначенную для этого таблицу не чаще, чем с заданной периодичностью.
*/
/** Что логгировать.
* Структура может меняться при изменении версии сервера.
* Если при первой записи обнаруживается, что имеющаяся таблица с логами имеет неподходящую стрктуру,
* то эта таблица переименовывается (откладывается в сторону) и создаётся новая таблица.
*/
struct
QueryLogElement
{
enum
Type
{
SHUTDOWN
=
0
,
/// Эта запись имеет служебное значение.
QUERY_START
=
1
,
QUERY_FINISH
=
2
,
};
enum
Interface
{
TCP
=
1
,
HTTP
=
2
,
OLAP_HTTP
=
3
,
};
enum
HTTPMethod
{
UNKNOWN
=
0
,
GET
=
1
,
POST
=
2
,
};
Type
type
;
/// В зависимости от типа, не все поля могут быть заполнены.
time_t
event_time
;
time_t
query_start_time
;
UInt64
query_duration_ms
;
UInt64
read_rows
;
UInt64
read_bytes
;
UInt64
result_rows
;
UInt64
result_bytes
;
String
query
;
Interface
interface
;
HTTPMethod
http_method
;
Poco
::
Net
::
IPAddress
ip_address
;
String
user
;
String
query_id
;
};
#define DBMS_QUERY_LOG_QUEUE_SIZE 1024
class
QueryLog
:
private
boost
::
noncopyable
{
public:
/** Передаётся имя таблицы, в которую писать лог.
* Если таблица не существует, то она создаётся с движком MergeTree, с ключём по event_time.
* Если таблица существует, то проверяется, подходящая ли у неё структура.
* Если структура подходящая, то будет использоваться эта таблица.
* Если нет - то существующая таблица переименовывается в такую же, но с добавлением суффикса _N на конце,
* где N - минимальное число, начиная с 1 такое, что таблицы с таким именем ещё нет;
* и создаётся новая таблица, как будто существующей таблицы не было.
*/
QueryLog
(
Context
&
context_
,
const
String
&
database_name_
,
const
String
&
table_name_
,
size_t
flush_interval_milliseconds_
)
:
context
(
context_
),
database_name
(
database_name_
),
table_name
(
table_name_
),
flush_interval_milliseconds
(
flush_interval_milliseconds_
)
{
data
.
reserve
(
DBMS_QUERY_LOG_QUEUE_SIZE
);
// TODO
saving_thread
=
std
::
thread
([
this
]
{
threadFunction
();
});
}
~
QueryLog
()
{
/// Говорим потоку, что надо завершиться.
QueryLogElement
elem
;
elem
.
type
=
QueryLogElement
::
SHUTDOWN
;
queue
.
push
(
elem
);
saving_thread
.
join
();
}
/** Добавить запись в лог.
* Сохранение в таблицу делается асинхронно, и в случае сбоя, запись может никуда не попасть.
*/
void
add
(
const
QueryLogElement
&
element
)
{
/// Здесь может быть блокировка. Возможно, в случае переполнения очереди, лучше сразу кидать эксепшен. Или даже отказаться от логгирования запроса.
queue
.
push
(
element
);
}
private:
Context
&
context
;
const
String
database_name
;
const
String
table_name
;
StoragePtr
table
;
const
size_t
flush_interval_milliseconds
;
/// Очередь всё-таки ограничена. Но размер достаточно большой, чтобы не блокироваться во всех нормальных ситуациях.
ConcurrentBoundedQueue
<
QueryLogElement
>
queue
{
DBMS_QUERY_LOG_QUEUE_SIZE
};
/** Данные, которые были вынуты из очереди. Здесь данные накапливаются, пока не пройдёт достаточное количество времени.
* Можно было бы использовать двойную буферизацию, но предполагается,
* что запись в таблицу с логом будет быстрее, чем обработка большой пачки запросов.
*/
std
::
vector
<
QueryLogElement
>
data
;
/** В этом потоке данные вынимаются из queue, складываются в data, а затем вставляются в таблицу.
*/
std
::
thread
saving_thread
;
void
threadFunction
()
{
Stopwatch
time_after_last_write
;
bool
first
=
true
;
while
(
true
)
{
try
{
if
(
first
)
{
time_after_last_write
.
restart
();
first
=
false
;
}
QueryLogElement
element
;
bool
has_element
=
false
;
if
(
data
.
empty
())
{
element
=
queue
.
pop
();
has_element
=
true
;
}
else
{
size_t
milliseconds_elapsed
=
time_after_last_write
.
elapsed
()
/
1000000
;
if
(
milliseconds_elapsed
<
flush_interval_milliseconds
)
has_element
=
queue
.
tryPop
(
element
,
flush_interval_milliseconds
-
milliseconds_elapsed
);
}
if
(
has_element
)
{
if
(
element
.
type
=
QueryLogElement
::
SHUTDOWN
)
{
flush
();
break
;
}
else
data
.
push_back
(
element
);
}
size_t
milliseconds_elapsed
=
time_after_last_write
.
elapsed
()
/
1000000
;
if
(
milliseconds_elapsed
>=
flush_interval_milliseconds
)
{
/// Записываем данные в таблицу.
flush
();
time_after_last_write
.
restart
();
}
}
catch
(...)
{
/// В случае ошибки теряем накопленные записи, чтобы не блокироваться.
data
.
clear
();
tryLogCurrentException
(
__PRETTY_FUNCTION__
);
}
}
}
Block
createBlock
()
{
return
{
{
new
ColumnUInt8
,
new
DataTypeUInt8
,
"type"
},
{
new
ColumnUInt32
,
new
DataTypeDateTime
,
"event_time"
},
{
new
ColumnUInt32
,
new
DataTypeDateTime
,
"query_start_time"
},
};
/* time_t event_time;
time_t query_start_time;
UInt64 query_duration_ms;
UInt64 read_rows;
UInt64 read_bytes;
UInt64 result_rows;
UInt64 result_bytes;
String query;
Interface interface;
HTTPMethod http_method;
Poco::Net::IPAddress ip_address;
String user;
String query_id;*/
}
void
flush
()
{
try
{
Block
block
=
createBlock
();
// TODO Формирование блока и запись.
}
catch
(...)
{
tryLogCurrentException
(
__PRETTY_FUNCTION__
);
}
data
.
clear
();
}
};
}
dbms/include/DB/Interpreters/QueryPriorities.h
0 → 100644
浏览文件 @
389958d7
#pragma once
#include <map>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <chrono>
/** Реализует приоритеты запросов.
* Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос.
*
* Величина приоритета - целое число, чем меньше - тем больше приоритет.
*
* Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда,
* не зависят от других запросов и не влияют на другие запросы.
* То есть 0 означает - не использовать приоритеты.
*
* NOTE Возможности сделать лучше:
* - реализовать ограничение на максимальное количество запросов с таким приоритетом.
*/
class
QueryPriorities
{
public:
using
Priority
=
int
;
private:
friend
struct
Handle
;
using
Count
=
int
;
/// Количество выполняющихся сейчас запросов с заданным приоритетом.
using
Container
=
std
::
map
<
Priority
,
Count
>
;
std
::
mutex
mutex
;
std
::
condition_variable
condvar
;
Container
container
;
/** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут.
* Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут.
*/
template
<
typename
Duration
>
bool
waitIfNeed
(
Priority
priority
,
Duration
timeout
)
{
if
(
0
==
priority
)
return
true
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex
);
while
(
true
)
{
/// Если ли хотя бы один более приоритетный запрос?
bool
found
=
false
;
for
(
const
auto
&
value
:
container
)
{
if
(
value
.
first
>=
priority
)
break
;
if
(
value
.
second
>
0
)
{
found
=
true
;
break
;
}
}
if
(
!
found
)
return
true
;
if
(
std
::
cv_status
::
timeout
==
condvar
.
wait_for
(
lock
,
timeout
))
return
false
;
}
}
public:
struct
HandleImpl
{
private:
QueryPriorities
&
parent
;
QueryPriorities
::
Container
::
value_type
&
value
;
public:
HandleImpl
(
QueryPriorities
&
parent_
,
QueryPriorities
::
Container
::
value_type
&
value_
)
:
parent
(
parent_
),
value
(
value_
)
{}
~
HandleImpl
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
parent
.
mutex
);
--
value
.
second
;
}
parent
.
condvar
.
notify_all
();
}
template
<
typename
Duration
>
bool
waitIfNeed
(
Duration
timeout
)
{
return
parent
.
waitIfNeed
(
value
.
first
,
timeout
);
}
};
using
Handle
=
std
::
shared_ptr
<
HandleImpl
>
;
/** Зарегистрировать, что запрос с заданным приоритетом выполняется.
* Возвращается объект, в деструкторе которого, запись о запросе удаляется.
*/
Handle
insert
(
Priority
priority
)
{
if
(
0
==
priority
)
return
{};
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
auto
it
=
container
.
emplace
(
priority
,
0
).
first
;
++
it
->
second
;
return
std
::
make_shared
<
HandleImpl
>
(
*
this
,
*
it
);
}
};
dbms/include/DB/Interpreters/Settings.h
浏览文件 @
389958d7
...
@@ -132,6 +132,9 @@ struct Settings
...
@@ -132,6 +132,9 @@ struct Settings
\
\
/** Позволяет выбирать метод сжатия данных при записи */
\
/** Позволяет выбирать метод сжатия данных при записи */
\
M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \
M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \
\
/** Приоритет запроса. 1 - самый высокий, больше - ниже; 0 - не использовать приоритеты. */
\
M(SettingUInt64, priority, 0) \
/// Всевозможные ограничения на выполнение запроса.
/// Всевозможные ограничения на выполнение запроса.
Limits
limits
;
Limits
limits
;
...
...
dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h
浏览文件 @
389958d7
...
@@ -42,9 +42,6 @@ public:
...
@@ -42,9 +42,6 @@ public:
if
(
right
!=
rhs
.
right
)
if
(
right
!=
rhs
.
right
)
return
right
<
rhs
.
right
;
return
right
<
rhs
.
right
;
if
(
level
!=
rhs
.
level
)
return
level
<
rhs
.
level
;
return
false
;
return
false
;
}
}
...
@@ -53,7 +50,6 @@ public:
...
@@ -53,7 +50,6 @@ public:
{
{
return
left_month
==
rhs
.
left_month
/// Куски за разные месяцы не объединяются
return
left_month
==
rhs
.
left_month
/// Куски за разные месяцы не объединяются
&&
right_month
==
rhs
.
right_month
&&
right_month
==
rhs
.
right_month
&&
level
>
rhs
.
level
&&
left_date
<=
rhs
.
left_date
&&
left_date
<=
rhs
.
left_date
&&
right_date
>=
rhs
.
right_date
&&
right_date
>=
rhs
.
right_date
&&
left
<=
rhs
.
left
&&
left
<=
rhs
.
left
...
...
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
浏览文件 @
389958d7
...
@@ -134,7 +134,7 @@ public:
...
@@ -134,7 +134,7 @@ public:
void
getStatus
(
Status
&
res
,
bool
with_zk_fields
=
true
);
void
getStatus
(
Status
&
res
,
bool
with_zk_fields
=
true
);
private:
private:
void
dropUnreplicatedPartition
(
const
Field
&
partition
,
const
Settings
&
settings
);
void
dropUnreplicatedPartition
(
const
Field
&
partition
,
bool
detach
,
const
Settings
&
settings
);
friend
class
ReplicatedMergeTreeBlockOutputStream
;
friend
class
ReplicatedMergeTreeBlockOutputStream
;
friend
class
ReplicatedMergeTreeRestartingThread
;
friend
class
ReplicatedMergeTreeRestartingThread
;
...
...
dbms/src/DataStreams/BlockIO.cpp
0 → 100644
浏览文件 @
389958d7
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockIO.h>
namespace
DB
{
BlockIO
::~
BlockIO
()
=
default
;
}
dbms/src/DataStreams/IProfilingBlockInputStream.cpp
浏览文件 @
389958d7
...
@@ -5,6 +5,7 @@
...
@@ -5,6 +5,7 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
...
@@ -320,7 +321,7 @@ void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback)
...
@@ -320,7 +321,7 @@ void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback)
}
}
void
IProfilingBlockInputStream
::
setProcessListElement
(
ProcessList
::
Element
*
elem
)
void
IProfilingBlockInputStream
::
setProcessListElement
(
ProcessListElement
*
elem
)
{
{
process_list_elem
=
elem
;
process_list_elem
=
elem
;
...
...
dbms/src/Functions/FunctionsMiscellaneous.cpp
浏览文件 @
389958d7
...
@@ -326,6 +326,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
...
@@ -326,6 +326,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
factory
.
registerFunction
<
FunctionMaterialize
>
();
factory
.
registerFunction
<
FunctionMaterialize
>
();
factory
.
registerFunction
<
FunctionIgnore
>
();
factory
.
registerFunction
<
FunctionIgnore
>
();
factory
.
registerFunction
<
FunctionArrayJoin
>
();
factory
.
registerFunction
<
FunctionArrayJoin
>
();
factory
.
registerFunction
<
FunctionReplicate
>
();
factory
.
registerFunction
<
FunctionBar
>
();
factory
.
registerFunction
<
FunctionBar
>
();
factory
.
registerFunction
<
FunctionTuple
>
();
factory
.
registerFunction
<
FunctionTuple
>
();
...
...
dbms/src/Interpreters/ProcessList.cpp
0 → 100644
浏览文件 @
389958d7
#include <DB/Interpreters/ProcessList.h>
namespace
DB
{
ProcessList
::
EntryPtr
ProcessList
::
insert
(
const
String
&
query_
,
const
String
&
user_
,
const
String
&
query_id_
,
const
Poco
::
Net
::
IPAddress
&
ip_address_
,
size_t
max_memory_usage
,
size_t
max_wait_milliseconds
,
bool
replace_running_query
,
QueryPriorities
::
Priority
priority
)
{
EntryPtr
res
;
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
mutex
);
if
(
max_size
&&
cur_size
>=
max_size
&&
(
!
max_wait_milliseconds
||
!
have_space
.
tryWait
(
mutex
,
max_wait_milliseconds
)))
throw
Exception
(
"Too much simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MUCH_SIMULTANEOUS_QUERIES
);
if
(
!
query_id_
.
empty
())
{
UserToQueries
::
iterator
queries
=
user_to_queries
.
find
(
user_
);
if
(
queries
!=
user_to_queries
.
end
())
{
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
query_id_
);
if
(
element
!=
queries
->
second
.
end
())
{
if
(
!
replace_running_query
)
throw
Exception
(
"Query with id = "
+
query_id_
+
" is already running."
,
ErrorCodes
::
QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING
);
element
->
second
->
is_cancelled
=
true
;
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
queries
->
second
.
erase
(
element
);
}
}
}
++
cur_size
;
res
.
reset
(
new
Entry
(
*
this
,
cont
.
emplace
(
cont
.
end
(),
query_
,
user_
,
query_id_
,
ip_address_
,
max_memory_usage
,
priorities
.
insert
(
priority
))));
if
(
!
query_id_
.
empty
())
user_to_queries
[
user_
][
query_id_
]
=
&
res
->
get
();
}
return
res
;
}
ProcessListEntry
::~
ProcessListEntry
()
{
Poco
::
ScopedLock
<
Poco
::
FastMutex
>
lock
(
parent
.
mutex
);
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
if
(
!
it
->
is_cancelled
&&
!
it
->
query_id
.
empty
())
{
ProcessList
::
UserToQueries
::
iterator
queries
=
parent
.
user_to_queries
.
find
(
it
->
user
);
if
(
queries
!=
parent
.
user_to_queries
.
end
())
{
ProcessList
::
QueryToElement
::
iterator
element
=
queries
->
second
.
find
(
it
->
query_id
);
if
(
element
!=
queries
->
second
.
end
())
queries
->
second
.
erase
(
element
);
}
}
parent
.
cont
.
erase
(
it
);
--
parent
.
cur_size
;
parent
.
have_space
.
signal
();
}
}
dbms/src/Interpreters/executeQuery.cpp
浏览文件 @
389958d7
...
@@ -14,6 +14,7 @@
...
@@ -14,6 +14,7 @@
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/InterpreterFactory.h>
#include <DB/Interpreters/InterpreterFactory.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/executeQuery.h>
...
@@ -88,11 +89,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
...
@@ -88,11 +89,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ProcessList
::
EntryPtr
process_list_entry
;
ProcessList
::
EntryPtr
process_list_entry
;
if
(
!
internal
&&
nullptr
==
typeid_cast
<
const
ASTShowProcesslistQuery
*>
(
&*
ast
))
if
(
!
internal
&&
nullptr
==
typeid_cast
<
const
ASTShowProcesslistQuery
*>
(
&*
ast
))
{
{
const
Settings
&
settings
=
context
.
getSettingsRef
();
process_list_entry
=
context
.
getProcessList
().
insert
(
process_list_entry
=
context
.
getProcessList
().
insert
(
query
,
context
.
getUser
(),
context
.
getCurrentQueryId
(),
context
.
getIPAddress
(),
query
,
context
.
getUser
(),
context
.
getCurrentQueryId
(),
context
.
getIPAddress
(),
context
.
getSettingsRef
().
limits
.
max_memory_usage
,
settings
.
limits
.
max_memory_usage
,
context
.
getSettingsRef
().
queue_max_wait_ms
.
totalMilliseconds
(),
settings
.
queue_max_wait_ms
.
totalMilliseconds
(),
context
.
getSettingsRef
().
replace_running_query
);
settings
.
replace_running_query
,
settings
.
priority
);
context
.
setProcessListElement
(
&
process_list_entry
->
get
());
context
.
setProcessListElement
(
&
process_list_entry
->
get
());
}
}
...
...
dbms/src/Parsers/ParserAlterQuery.cpp
浏览文件 @
389958d7
...
@@ -150,6 +150,12 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
...
@@ -150,6 +150,12 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
{
{
ws
.
ignore
(
pos
,
end
);
ws
.
ignore
(
pos
,
end
);
if
(
s_unreplicated
.
ignore
(
pos
,
end
,
max_parsed_pos
,
expected
))
{
params
.
unreplicated
=
true
;
ws
.
ignore
(
pos
,
end
);
}
if
(
!
s_partition
.
ignore
(
pos
,
end
,
max_parsed_pos
,
expected
))
if
(
!
s_partition
.
ignore
(
pos
,
end
,
max_parsed_pos
,
expected
))
return
false
;
return
false
;
...
...
dbms/src/Server/Server.cpp
浏览文件 @
389958d7
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#include <DB/Common/Macros.h>
#include <DB/Common/Macros.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemTables.h>
#include <DB/Storages/StorageSystemTables.h>
#include <DB/Storages/StorageSystemParts.h>
#include <DB/Storages/StorageSystemParts.h>
...
...
dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
浏览文件 @
389958d7
...
@@ -340,8 +340,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
...
@@ -340,8 +340,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
const
Names
&
virt_columns
,
const
Names
&
virt_columns
,
const
Settings
&
settings
)
const
Settings
&
settings
)
{
{
size_t
min_marks_for_concurrent_read
=
(
settings
.
merge_tree_min_rows_for_concurrent_read
+
data
.
index_granularity
-
1
)
/
data
.
index_granularity
;
const
size_t
min_marks_for_concurrent_read
=
size_t
max_marks_to_use_cache
=
(
settings
.
merge_tree_max_rows_to_use_cache
+
data
.
index_granularity
-
1
)
/
data
.
index_granularity
;
(
settings
.
merge_tree_min_rows_for_concurrent_read
+
data
.
index_granularity
-
1
)
/
data
.
index_granularity
;
const
size_t
max_marks_to_use_cache
=
(
settings
.
merge_tree_max_rows_to_use_cache
+
data
.
index_granularity
-
1
)
/
data
.
index_granularity
;
/// На всякий случай перемешаем куски.
/// На всякий случай перемешаем куски.
std
::
random_shuffle
(
parts
.
begin
(),
parts
.
end
());
std
::
random_shuffle
(
parts
.
begin
(),
parts
.
end
());
...
@@ -354,12 +356,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
...
@@ -354,12 +356,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
std
::
reverse
(
parts
[
i
].
ranges
.
begin
(),
parts
[
i
].
ranges
.
end
());
std
::
reverse
(
parts
[
i
].
ranges
.
begin
(),
parts
[
i
].
ranges
.
end
());
sum_marks_in_parts
[
i
]
=
0
;
for
(
const
auto
&
range
:
parts
[
i
].
ranges
)
for
(
size_t
j
=
0
;
j
<
parts
[
i
].
ranges
.
size
();
++
j
)
{
MarkRange
&
range
=
parts
[
i
].
ranges
[
j
];
sum_marks_in_parts
[
i
]
+=
range
.
end
-
range
.
begin
;
sum_marks_in_parts
[
i
]
+=
range
.
end
-
range
.
begin
;
}
sum_marks
+=
sum_marks_in_parts
[
i
];
sum_marks
+=
sum_marks_in_parts
[
i
];
}
}
...
@@ -370,7 +369,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
...
@@ -370,7 +369,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
if
(
sum_marks
>
0
)
if
(
sum_marks
>
0
)
{
{
size_t
min_marks_per_thread
=
(
sum_marks
-
1
)
/
threads
+
1
;
const
size_t
min_marks_per_thread
=
(
sum_marks
-
1
)
/
threads
+
1
;
for
(
size_t
i
=
0
;
i
<
threads
&&
!
parts
.
empty
();
++
i
)
for
(
size_t
i
=
0
;
i
<
threads
&&
!
parts
.
empty
();
++
i
)
{
{
...
@@ -415,10 +414,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
...
@@ -415,10 +414,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
throw
Exception
(
"Unexpected end of ranges while spreading marks among threads"
,
ErrorCodes
::
LOGICAL_ERROR
);
throw
Exception
(
"Unexpected end of ranges while spreading marks among threads"
,
ErrorCodes
::
LOGICAL_ERROR
);
MarkRange
&
range
=
part
.
ranges
.
back
();
MarkRange
&
range
=
part
.
ranges
.
back
();
size_t
marks_in_range
=
range
.
end
-
range
.
begin
;
size_t
marks_to_get_from_range
=
std
::
min
(
marks_in_range
,
need_marks
);
const
size_t
marks_in_range
=
range
.
end
-
range
.
begin
;
ranges_to_get_from_part
.
push_back
(
MarkRange
(
range
.
begin
,
range
.
begin
+
marks_to_get_from_range
));
const
size_t
marks_to_get_from_range
=
std
::
min
(
marks_in_range
,
need_marks
);
ranges_to_get_from_part
.
emplace_back
(
range
.
begin
,
range
.
begin
+
marks_to_get_from_range
);
range
.
begin
+=
marks_to_get_from_range
;
range
.
begin
+=
marks_to_get_from_range
;
marks_in_part
-=
marks_to_get_from_range
;
marks_in_part
-=
marks_to_get_from_range
;
need_marks
-=
marks_to_get_from_range
;
need_marks
-=
marks_to_get_from_range
;
...
...
dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp
浏览文件 @
389958d7
...
@@ -11,6 +11,7 @@
...
@@ -11,6 +11,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/HashingReadBuffer.h>
#include <DB/IO/HashingReadBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnsNumber.h>
#include <statdaemons/ext/scope_guard.hpp>
namespace
DB
namespace
DB
...
@@ -115,6 +116,12 @@ struct Stream
...
@@ -115,6 +116,12 @@ struct Stream
readIntBinary
(
mrk_mark
.
offset_in_compressed_file
,
mrk_hashing_buf
);
readIntBinary
(
mrk_mark
.
offset_in_compressed_file
,
mrk_hashing_buf
);
readIntBinary
(
mrk_mark
.
offset_in_decompressed_block
,
mrk_hashing_buf
);
readIntBinary
(
mrk_mark
.
offset_in_decompressed_block
,
mrk_hashing_buf
);
/// На всякий случай, сохраним смещение в файле и размер предыдущего блока.
SCOPE_EXIT
(
prev_offset_in_compressed_file
=
mrk_mark
.
offset_in_compressed_file
;
prev_buffer_size
=
uncompressed_hashing_buf
.
buffer
().
size
();
);
bool
has_alternative_mark
=
false
;
bool
has_alternative_mark
=
false
;
MarkInCompressedFile
alternative_data_mark
;
MarkInCompressedFile
alternative_data_mark
;
MarkInCompressedFile
data_mark
;
MarkInCompressedFile
data_mark
;
...
@@ -138,6 +145,18 @@ struct Stream
...
@@ -138,6 +145,18 @@ struct Stream
if
(
uncompressed_hashing_buf
.
eof
())
if
(
uncompressed_hashing_buf
.
eof
())
return
;
return
;
}
}
else
if
(
uncompressed_hashing_buf
.
offset
()
==
0
)
{
/// Восстановим засечку на конец предыдущего блока по сохраненным данным
has_alternative_mark
=
true
;
alternative_data_mark
.
offset_in_compressed_file
=
prev_offset_in_compressed_file
;
alternative_data_mark
.
offset_in_decompressed_block
=
prev_buffer_size
;
if
(
mrk_mark
==
alternative_data_mark
)
return
;
}
std
::
cout
<<
"mrk_mark "
<<
mrk_mark
.
offset_in_compressed_file
<<
' '
<<
mrk_mark
.
offset_in_decompressed_block
<<
std
::
endl
;
data_mark
.
offset_in_compressed_file
=
compressed_hashing_buf
.
count
()
-
uncompressing_buf
.
getSizeCompressed
();
data_mark
.
offset_in_compressed_file
=
compressed_hashing_buf
.
count
()
-
uncompressing_buf
.
getSizeCompressed
();
data_mark
.
offset_in_decompressed_block
=
uncompressed_hashing_buf
.
offset
();
data_mark
.
offset_in_decompressed_block
=
uncompressed_hashing_buf
.
offset
();
...
@@ -161,6 +180,10 @@ struct Stream
...
@@ -161,6 +180,10 @@ struct Stream
checksums
.
files
[
name
+
".mrk"
]
=
MergeTreeData
::
DataPart
::
Checksums
::
Checksum
(
checksums
.
files
[
name
+
".mrk"
]
=
MergeTreeData
::
DataPart
::
Checksums
::
Checksum
(
mrk_hashing_buf
.
count
(),
mrk_hashing_buf
.
getHash
());
mrk_hashing_buf
.
count
(),
mrk_hashing_buf
.
getHash
());
}
}
private:
size_t
prev_offset_in_compressed_file
{};
size_t
prev_buffer_size
{};
};
};
/// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца.
/// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца.
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
389958d7
...
@@ -2224,7 +2224,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
...
@@ -2224,7 +2224,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
}
}
void
StorageReplicatedMergeTree
::
dropUnreplicatedPartition
(
const
Field
&
partition
,
const
Settings
&
settings
)
void
StorageReplicatedMergeTree
::
dropUnreplicatedPartition
(
const
Field
&
partition
,
const
bool
detach
,
const
Settings
&
settings
)
{
{
if
(
!
unreplicated_data
)
if
(
!
unreplicated_data
)
return
;
return
;
...
@@ -2247,10 +2247,13 @@ void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partiti
...
@@ -2247,10 +2247,13 @@ void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partiti
LOG_DEBUG
(
log
,
"Removing unreplicated part "
<<
part
->
name
);
LOG_DEBUG
(
log
,
"Removing unreplicated part "
<<
part
->
name
);
++
removed_parts
;
++
removed_parts
;
unreplicated_data
->
replaceParts
({
part
},
{},
false
);
if
(
detach
)
unreplicated_data
->
renameAndDetachPart
(
part
,
""
);
else
unreplicated_data
->
replaceParts
({
part
},
{},
false
);
}
}
LOG_INFO
(
log
,
"Removed "
<<
removed_parts
<<
" unreplicated parts inside "
<<
apply_visitor
(
FieldVisitorToString
(),
partition
)
<<
"."
);
LOG_INFO
(
log
,
(
detach
?
"Detached "
:
"Removed "
)
<<
removed_parts
<<
" unreplicated parts inside "
<<
apply_visitor
(
FieldVisitorToString
(),
partition
)
<<
"."
);
}
}
...
@@ -2258,13 +2261,7 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach,
...
@@ -2258,13 +2261,7 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach,
{
{
if
(
unreplicated
)
if
(
unreplicated
)
{
{
if
(
detach
)
dropUnreplicatedPartition
(
field
,
detach
,
settings
);
throw
Exception
{
"DETACH UNREPLICATED PATITION not supported"
,
ErrorCodes
::
LOGICAL_ERROR
};
dropUnreplicatedPartition
(
field
,
settings
);
return
;
return
;
}
}
...
...
dbms/src/Storages/StorageSystemProcesses.cpp
浏览文件 @
389958d7
...
@@ -2,6 +2,7 @@
...
@@ -2,6 +2,7 @@
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Storages/StorageSystemProcesses.h>
#include <DB/Storages/StorageSystemProcesses.h>
...
...
dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh
浏览文件 @
389958d7
#!/bin/bash
#!/bin/bash
curl
'http://localhost:8123/?query=DROP+TABLE'
-d
'IF EXISTS test.insert'
curl
-sS
'http://localhost:8123/?query=DROP+TABLE'
-d
'IF EXISTS test.insert'
curl
'http://localhost:8123/?query=CREATE'
-d
'TABLE test.insert (x UInt8) ENGINE = Memory'
curl
-sS
'http://localhost:8123/?query=CREATE'
-d
'TABLE test.insert (x UInt8) ENGINE = Memory'
curl
'http://localhost:8123/'
-d
'INSERT INTO test.insert VALUES (1),(2)'
curl
-sS
'http://localhost:8123/'
-d
'INSERT INTO test.insert VALUES (1),(2)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES'
-d
'(3),(4)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES'
-d
'(3),(4)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert'
-d
'VALUES (5),(6)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert'
-d
'VALUES (5),(6)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(7)'
-d
',(8)'
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(7)'
-d
',(8)'
curl
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(9),(10)'
-d
' '
curl
-sS
'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(9),(10)'
-d
' '
curl
'http://localhost:8123/'
-d
'SELECT x FROM test.insert ORDER BY x'
curl
-sS
'http://localhost:8123/'
-d
'SELECT x FROM test.insert ORDER BY x'
curl
'http://localhost:8123/?query=DROP+TABLE'
-d
'test.insert'
curl
-sS
'http://localhost:8123/?query=DROP+TABLE'
-d
'test.insert'
dbms/tests/queries/0_stateless/00178_function_replicate.reference
0 → 100644
浏览文件 @
389958d7
0 [] [] [] [] []
1 [0] [1] ['1'] [[0]] [['0']]
2 [0,1] [2,2] ['2','2'] [[0,1],[0,1]] [['0','1'],['0','1']]
3 [0,1,2] [3,3,3] ['3','3','3'] [[0,1,2],[0,1,2],[0,1,2]] [['0','1','2'],['0','1','2'],['0','1','2']]
4 [0,1,2,3] [4,4,4,4] ['4','4','4','4'] [[0,1,2,3],[0,1,2,3],[0,1,2,3],[0,1,2,3]] [['0','1','2','3'],['0','1','2','3'],['0','1','2','3'],['0','1','2','3']]
5 [0,1,2,3,4] [5,5,5,5,5] ['5','5','5','5','5'] [[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4]] [['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4']]
6 [0,1,2,3,4,5] [6,6,6,6,6,6] ['6','6','6','6','6','6'] [[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5]] [['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5']]
7 [0,1,2,3,4,5,6] [7,7,7,7,7,7,7] ['7','7','7','7','7','7','7'] [[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6]] [['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6']]
8 [0,1,2,3,4,5,6,7] [8,8,8,8,8,8,8,8] ['8','8','8','8','8','8','8','8'] [[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7]] [['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7']]
9 [0,1,2,3,4,5,6,7,8] [9,9,9,9,9,9,9,9,9] ['9','9','9','9','9','9','9','9','9'] [[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8]] [['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8']]
dbms/tests/queries/0_stateless/00178_function_replicate.sql
0 → 100644
浏览文件 @
389958d7
SELECT
number
,
range
(
number
)
AS
arr
,
replicate
(
number
,
arr
),
replicate
(
toString
(
number
),
arr
),
replicate
(
range
(
number
),
arr
),
replicate
(
arrayMap
(
x
->
toString
(
x
),
range
(
number
)),
arr
)
FROM
system
.
numbers
LIMIT
10
;
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference
0 → 100644
浏览文件 @
389958d7
[0]
[0,1,2]
[0,1,2,3,4]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7,8]
dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql
0 → 100644
浏览文件 @
389958d7
SELECT
arrayMap
(
x
->
number
!=
-
1
?
x
:
0
,
arr
)
FROM
(
SELECT
number
,
range
(
number
)
AS
arr
FROM
system
.
numbers
LIMIT
10
)
WHERE
number
%
2
=
1
AND
arrayExists
(
x
->
number
!=
-
1
,
arr
);
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录