Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
089e71d9
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
089e71d9
编写于
4月 17, 2018
作者:
A
alexey-milovidov
提交者:
GitHub
4月 17, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2235 from yandex/small-enhancements
Small enhancements
上级
c9c09a9d
400ad557
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
290 addition
and
91 deletion
+290
-91
dbms/src/DataStreams/IBlockInputStream.h
dbms/src/DataStreams/IBlockInputStream.h
+5
-2
dbms/src/DataStreams/IProfilingBlockInputStream.h
dbms/src/DataStreams/IProfilingBlockInputStream.h
+4
-2
dbms/src/Interpreters/Context.cpp
dbms/src/Interpreters/Context.cpp
+18
-12
dbms/src/Interpreters/Context.h
dbms/src/Interpreters/Context.h
+2
-1
dbms/src/Interpreters/DDLWorker.cpp
dbms/src/Interpreters/DDLWorker.cpp
+31
-2
dbms/src/Interpreters/DDLWorker.h
dbms/src/Interpreters/DDLWorker.h
+2
-1
dbms/src/Interpreters/InterpreterAlterQuery.cpp
dbms/src/Interpreters/InterpreterAlterQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterCreateQuery.cpp
dbms/src/Interpreters/InterpreterCreateQuery.cpp
+8
-2
dbms/src/Interpreters/InterpreterDropQuery.cpp
dbms/src/Interpreters/InterpreterDropQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterKillQueryQuery.cpp
dbms/src/Interpreters/InterpreterKillQueryQuery.cpp
+0
-1
dbms/src/Interpreters/InterpreterRenameQuery.cpp
dbms/src/Interpreters/InterpreterRenameQuery.cpp
+10
-1
dbms/src/Interpreters/ProcessList.cpp
dbms/src/Interpreters/ProcessList.cpp
+71
-13
dbms/src/Interpreters/ProcessList.h
dbms/src/Interpreters/ProcessList.h
+8
-2
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp
+42
-0
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h
+8
-34
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
...torages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
+2
-8
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+17
-0
dbms/src/Storages/StorageReplicatedMergeTree.h
dbms/src/Storages/StorageReplicatedMergeTree.h
+4
-0
dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml
.../test_distributed_ddl/configs/users.d/restricted_user.xml
+16
-0
dbms/tests/integration/test_distributed_ddl/test.py
dbms/tests/integration/test_distributed_ddl/test.py
+18
-0
dbms/tests/queries/0_stateless/00446_clear_column_in_partition_zookeeper.sql
...0_stateless/00446_clear_column_in_partition_zookeeper.sql
+0
-8
dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.reference
...s/00620_optimize_on_nonleader_replica_zookeeper.reference
+2
-0
dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql
...ateless/00620_optimize_on_nonleader_replica_zookeeper.sql
+20
-0
未找到文件。
dbms/src/DataStreams/IBlockInputStream.h
浏览文件 @
089e71d9
...
...
@@ -3,6 +3,7 @@
#include <vector>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <functional>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
...
...
@@ -108,7 +109,9 @@ public:
template
<
typename
F
>
void
forEachChild
(
F
&&
f
)
{
std
::
lock_guard
lock
(
children_mutex
);
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std
::
shared_lock
lock
(
children_mutex
);
for
(
auto
&
child
:
children
)
if
(
f
(
*
child
))
return
;
...
...
@@ -116,7 +119,7 @@ public:
protected:
BlockInputStreams
children
;
std
::
mutex
children_mutex
;
std
::
shared_
mutex
children_mutex
;
private:
TableStructureReadLocks
table_locks
;
...
...
dbms/src/DataStreams/IProfilingBlockInputStream.h
浏览文件 @
089e71d9
...
...
@@ -190,7 +190,7 @@ protected:
void
addChild
(
BlockInputStreamPtr
&
child
)
{
std
::
lock_guard
lock
(
children_mutex
);
std
::
unique_lock
lock
(
children_mutex
);
children
.
push_back
(
child
);
}
...
...
@@ -231,7 +231,9 @@ private:
template
<
typename
F
>
void
forEachProfilingChild
(
F
&&
f
)
{
std
::
lock_guard
lock
(
children_mutex
);
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std
::
shared_lock
lock
(
children_mutex
);
for
(
auto
&
child
:
children
)
if
(
IProfilingBlockInputStream
*
p_child
=
dynamic_cast
<
IProfilingBlockInputStream
*>
(
child
.
get
()))
if
(
f
(
*
p_child
))
...
...
dbms/src/Interpreters/Context.cpp
浏览文件 @
089e71d9
...
...
@@ -589,6 +589,12 @@ QuotaForIntervals & Context::getQuota()
}
void
Context
::
checkDatabaseAccessRights
(
const
std
::
string
&
database_name
)
const
{
auto
lock
=
getLock
();
checkDatabaseAccessRightsImpl
(
database_name
);
}
void
Context
::
checkDatabaseAccessRightsImpl
(
const
std
::
string
&
database_name
)
const
{
if
(
client_info
.
current_user
.
empty
()
||
(
database_name
==
"system"
))
{
...
...
@@ -603,8 +609,8 @@ void Context::checkDatabaseAccessRights(const std::string & database_name) const
void
Context
::
addDependency
(
const
DatabaseAndTableName
&
from
,
const
DatabaseAndTableName
&
where
)
{
auto
lock
=
getLock
();
checkDatabaseAccessRights
(
from
.
first
);
checkDatabaseAccessRights
(
where
.
first
);
checkDatabaseAccessRights
Impl
(
from
.
first
);
checkDatabaseAccessRights
Impl
(
where
.
first
);
shared
->
view_dependencies
[
from
].
insert
(
where
);
// Notify table of dependencies change
...
...
@@ -616,8 +622,8 @@ void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAnd
void
Context
::
removeDependency
(
const
DatabaseAndTableName
&
from
,
const
DatabaseAndTableName
&
where
)
{
auto
lock
=
getLock
();
checkDatabaseAccessRights
(
from
.
first
);
checkDatabaseAccessRights
(
where
.
first
);
checkDatabaseAccessRights
Impl
(
from
.
first
);
checkDatabaseAccessRights
Impl
(
where
.
first
);
shared
->
view_dependencies
[
from
].
erase
(
where
);
// Notify table of dependencies change
...
...
@@ -638,7 +644,7 @@ Dependencies Context::getDependencies(const String & database_name, const String
}
else
{
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
}
ViewDependencies
::
const_iterator
iter
=
shared
->
view_dependencies
.
find
(
DatabaseAndTableName
(
db
,
table_name
));
...
...
@@ -653,7 +659,7 @@ bool Context::isTableExist(const String & database_name, const String & table_na
auto
lock
=
getLock
();
String
db
=
resolveDatabase
(
database_name
,
current_database
);
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
Databases
::
const_iterator
it
=
shared
->
databases
.
find
(
db
);
return
shared
->
databases
.
end
()
!=
it
...
...
@@ -665,7 +671,7 @@ bool Context::isDatabaseExist(const String & database_name) const
{
auto
lock
=
getLock
();
String
db
=
resolveDatabase
(
database_name
,
current_database
);
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
return
shared
->
databases
.
end
()
!=
shared
->
databases
.
find
(
db
);
}
...
...
@@ -680,7 +686,7 @@ void Context::assertTableExists(const String & database_name, const String & tab
auto
lock
=
getLock
();
String
db
=
resolveDatabase
(
database_name
,
current_database
);
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
Databases
::
const_iterator
it
=
shared
->
databases
.
find
(
db
);
if
(
shared
->
databases
.
end
()
==
it
)
...
...
@@ -697,7 +703,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String
String
db
=
resolveDatabase
(
database_name
,
current_database
);
if
(
check_database_access_rights
)
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
Databases
::
const_iterator
it
=
shared
->
databases
.
find
(
db
);
if
(
shared
->
databases
.
end
()
!=
it
&&
it
->
second
->
isTableExist
(
*
this
,
table_name
))
...
...
@@ -711,7 +717,7 @@ void Context::assertDatabaseExists(const String & database_name, bool check_data
String
db
=
resolveDatabase
(
database_name
,
current_database
);
if
(
check_database_access_rights
)
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
if
(
shared
->
databases
.
end
()
==
shared
->
databases
.
find
(
db
))
throw
Exception
(
"Database "
+
backQuoteIfNeed
(
db
)
+
" doesn't exist"
,
ErrorCodes
::
UNKNOWN_DATABASE
);
...
...
@@ -723,7 +729,7 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const
auto
lock
=
getLock
();
String
db
=
resolveDatabase
(
database_name
,
current_database
);
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
if
(
shared
->
databases
.
end
()
!=
shared
->
databases
.
find
(
db
))
throw
Exception
(
"Database "
+
backQuoteIfNeed
(
db
)
+
" already exists."
,
ErrorCodes
::
DATABASE_ALREADY_EXISTS
);
...
...
@@ -790,7 +796,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
}
String
db
=
resolveDatabase
(
database_name
,
current_database
);
checkDatabaseAccessRights
(
db
);
checkDatabaseAccessRights
Impl
(
db
);
Databases
::
const_iterator
it
=
shared
->
databases
.
find
(
db
);
if
(
shared
->
databases
.
end
()
==
it
)
...
...
dbms/src/Interpreters/Context.h
浏览文件 @
089e71d9
...
...
@@ -178,6 +178,7 @@ public:
void
assertDatabaseExists
(
const
String
&
database_name
,
bool
check_database_acccess_rights
=
true
)
const
;
void
assertDatabaseDoesntExist
(
const
String
&
database_name
)
const
;
void
checkDatabaseAccessRights
(
const
std
::
string
&
database_name
)
const
;
Tables
getExternalTables
()
const
;
StoragePtr
tryGetExternalTable
(
const
String
&
table_name
)
const
;
...
...
@@ -392,7 +393,7 @@ private:
* If access is denied, throw an exception.
* NOTE: This method should always be called when the `shared->mutex` mutex is acquired.
*/
void
checkDatabaseAccessRights
(
const
std
::
string
&
database_name
)
const
;
void
checkDatabaseAccessRights
Impl
(
const
std
::
string
&
database_name
)
const
;
EmbeddedDictionaries
&
getEmbeddedDictionariesImpl
(
bool
throw_on_error
)
const
;
ExternalDictionaries
&
getExternalDictionariesImpl
(
bool
throw_on_error
)
const
;
...
...
dbms/src/Interpreters/DDLWorker.cpp
浏览文件 @
089e71d9
...
...
@@ -960,15 +960,25 @@ public:
{
Block
res
;
if
(
num_hosts_finished
>=
waiting_hosts
.
size
())
{
if
(
first_exception
)
throw
Exception
(
*
first_exception
);
return
res
;
}
auto
zookeeper
=
context
.
getZooKeeper
();
size_t
try_number
=
0
;
while
(
res
.
rows
()
==
0
)
while
(
res
.
rows
()
==
0
)
{
if
(
isCancelled
())
{
if
(
first_exception
)
throw
Exception
(
*
first_exception
);
return
res
;
}
if
(
timeout_seconds
>=
0
&&
watch
.
elapsedSeconds
()
>
timeout_seconds
)
{
...
...
@@ -1020,6 +1030,9 @@ public:
UInt16
port
;
Cluster
::
Address
::
fromString
(
host_id
,
host
,
port
);
if
(
status
.
code
!=
0
&&
first_exception
==
nullptr
)
first_exception
=
std
::
make_unique
<
Exception
>
(
"There was an error on "
+
host
+
": "
+
status
.
message
,
status
.
code
);
++
num_hosts_finished
;
columns
[
0
]
->
insert
(
host
);
...
...
@@ -1092,11 +1105,14 @@ private:
Strings
current_active_hosts
;
/// Hosts that were in active state at the last check
size_t
num_hosts_finished
=
0
;
/// Save the first detected error and throw it at the end of excecution
std
::
unique_ptr
<
Exception
>
first_exception
;
Int64
timeout_seconds
=
120
;
};
BlockIO
executeDDLQueryOnCluster
(
const
ASTPtr
&
query_ptr_
,
const
Context
&
context
)
BlockIO
executeDDLQueryOnCluster
(
const
ASTPtr
&
query_ptr_
,
const
Context
&
context
,
const
NameSet
&
query_databases
)
{
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
ASTPtr
query_ptr
=
query_ptr_
->
clone
();
...
...
@@ -1128,13 +1144,26 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
entry
.
query
=
queryToString
(
query_ptr
);
entry
.
initiator
=
ddl_worker
.
getCommonHostID
();
/// Check database access rights, assume that all servers have the same users config
NameSet
databases_to_check_access_rights
;
Cluster
::
AddressesWithFailover
shards
=
cluster
->
getShardsAddresses
();
for
(
const
auto
&
shard
:
shards
)
{
for
(
const
auto
&
addr
:
shard
)
{
entry
.
hosts
.
emplace_back
(
addr
);
/// Expand empty database name to shards' default database name
for
(
const
String
&
database
:
query_databases
)
databases_to_check_access_rights
.
emplace
(
database
.
empty
()
?
addr
.
default_database
:
database
);
}
}
for
(
const
String
&
database
:
databases_to_check_access_rights
)
context
.
checkDatabaseAccessRights
(
database
.
empty
()
?
context
.
getCurrentDatabase
()
:
database
);
String
node_path
=
ddl_worker
.
enqueueQuery
(
entry
);
BlockIO
io
;
...
...
dbms/src/Interpreters/DDLWorker.h
浏览文件 @
089e71d9
...
...
@@ -18,7 +18,8 @@ struct DDLLogEntry;
struct
DDLTask
;
BlockIO
executeDDLQueryOnCluster
(
const
ASTPtr
&
query_ptr
,
const
Context
&
context
);
/// Pushes distributed DDL query to the queue
BlockIO
executeDDLQueryOnCluster
(
const
ASTPtr
&
query_ptr
,
const
Context
&
context
,
const
NameSet
&
query_databases
);
class
DDLWorker
...
...
dbms/src/Interpreters/InterpreterAlterQuery.cpp
浏览文件 @
089e71d9
...
...
@@ -42,7 +42,7 @@ BlockIO InterpreterAlterQuery::execute()
auto
&
alter
=
typeid_cast
<
ASTAlterQuery
&>
(
*
query_ptr
);
if
(
!
alter
.
cluster
.
empty
())
return
executeDDLQueryOnCluster
(
query_ptr
,
context
);
return
executeDDLQueryOnCluster
(
query_ptr
,
context
,
{
alter
.
table
}
);
const
String
&
table_name
=
alter
.
table
;
String
database_name
=
alter
.
database
.
empty
()
?
context
.
getCurrentDatabase
()
:
alter
.
database
;
...
...
dbms/src/Interpreters/InterpreterCreateQuery.cpp
浏览文件 @
089e71d9
...
...
@@ -66,7 +66,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Contex
BlockIO
InterpreterCreateQuery
::
createDatabase
(
ASTCreateQuery
&
create
)
{
if
(
!
create
.
cluster
.
empty
())
return
executeDDLQueryOnCluster
(
query_ptr
,
context
);
return
executeDDLQueryOnCluster
(
query_ptr
,
context
,
{
create
.
database
}
);
String
database_name
=
create
.
database
;
...
...
@@ -439,7 +439,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
BlockIO
InterpreterCreateQuery
::
createTable
(
ASTCreateQuery
&
create
)
{
if
(
!
create
.
cluster
.
empty
())
return
executeDDLQueryOnCluster
(
query_ptr
,
context
);
{
NameSet
databases
{
create
.
database
};
if
(
!
create
.
to_table
.
empty
())
databases
.
emplace
(
create
.
to_database
);
return
executeDDLQueryOnCluster
(
query_ptr
,
context
,
databases
);
}
String
path
=
context
.
getPath
();
String
current_database
=
context
.
getCurrentDatabase
();
...
...
dbms/src/Interpreters/InterpreterDropQuery.cpp
浏览文件 @
089e71d9
...
...
@@ -32,7 +32,7 @@ BlockIO InterpreterDropQuery::execute()
checkAccess
(
drop
);
if
(
!
drop
.
cluster
.
empty
())
return
executeDDLQueryOnCluster
(
query_ptr
,
context
);
return
executeDDLQueryOnCluster
(
query_ptr
,
context
,
{
drop
.
database
}
);
String
path
=
context
.
getPath
();
String
current_database
=
context
.
getCurrentDatabase
();
...
...
dbms/src/Interpreters/InterpreterKillQueryQuery.cpp
浏览文件 @
089e71d9
...
...
@@ -147,7 +147,6 @@ public:
}
/// KILL QUERY could be killed also
/// Probably interpreting KILL QUERIES as complete (not internal) queries is extra functionality
if
(
isCancelled
())
break
;
...
...
dbms/src/Interpreters/InterpreterRenameQuery.cpp
浏览文件 @
089e71d9
...
...
@@ -39,7 +39,16 @@ BlockIO InterpreterRenameQuery::execute()
ASTRenameQuery
&
rename
=
typeid_cast
<
ASTRenameQuery
&>
(
*
query_ptr
);
if
(
!
rename
.
cluster
.
empty
())
return
executeDDLQueryOnCluster
(
query_ptr
,
context
);
{
NameSet
databases
;
for
(
const
auto
&
elem
:
rename
.
elements
)
{
databases
.
emplace
(
elem
.
from
.
database
);
databases
.
emplace
(
elem
.
to
.
database
);
}
return
executeDDLQueryOnCluster
(
query_ptr
,
context
,
databases
);
}
String
path
=
context
.
getPath
();
String
current_database
=
context
.
getCurrentDatabase
();
...
...
dbms/src/Interpreters/ProcessList.cpp
浏览文件 @
089e71d9
#include <Interpreters/ProcessList.h>
#include <Interpreters/Settings.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/IProfilingBlockInputStream.h>
...
...
@@ -19,21 +22,70 @@ namespace ErrorCodes
}
/// Should we execute the query even if max_concurrent_queries limit is exhausted
static
bool
isUnlimitedQuery
(
const
IAST
*
ast
)
{
if
(
!
ast
)
return
false
;
/// It is KILL QUERY
if
(
typeid_cast
<
const
ASTKillQueryQuery
*>
(
ast
))
return
true
;
/// It is SELECT FROM system.processes
if
(
auto
ast_selects
=
typeid_cast
<
const
ASTSelectWithUnionQuery
*>
(
ast
))
{
if
(
!
ast_selects
->
list_of_selects
||
ast_selects
->
list_of_selects
->
children
.
empty
())
return
false
;
auto
ast_select
=
typeid_cast
<
ASTSelectQuery
*>
(
ast_selects
->
list_of_selects
->
children
[
0
].
get
());
if
(
!
ast_select
)
return
false
;
auto
ast_database
=
ast_select
->
database
();
if
(
!
ast_database
)
return
false
;
auto
ast_table
=
ast_select
->
table
();
if
(
!
ast_table
)
return
false
;
auto
ast_database_id
=
typeid_cast
<
const
ASTIdentifier
*>
(
ast_database
.
get
());
if
(
!
ast_database_id
)
return
false
;
auto
ast_table_id
=
typeid_cast
<
const
ASTIdentifier
*>
(
ast_table
.
get
());
if
(
!
ast_table_id
)
return
false
;
return
ast_database_id
->
name
==
"system"
&&
ast_table_id
->
name
==
"processes"
;
}
return
false
;
}
ProcessList
::
EntryPtr
ProcessList
::
insert
(
const
String
&
query_
,
const
IAST
*
ast
,
const
ClientInfo
&
client_info
,
const
Settings
&
settings
)
{
EntryPtr
res
;
bool
is_kill_query
=
ast
&&
typeid_cast
<
const
ASTKillQueryQuery
*>
(
ast
);
if
(
client_info
.
current_query_id
.
empty
())
throw
Exception
(
"Query id cannot be empty"
,
ErrorCodes
::
LOGICAL_ERROR
);
bool
is_unlimited_query
=
isUnlimitedQuery
(
ast
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
if
(
!
is_kill_query
&&
max_size
&&
cur_size
>=
max_size
&&
(
!
settings
.
queue_max_wait_ms
.
totalMilliseconds
()
||
!
have_space
.
tryWait
(
mutex
,
settings
.
queue_max_wait_ms
.
totalMilliseconds
())))
throw
Exception
(
"Too many simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MANY_SIMULTANEOUS_QUERIES
);
if
(
!
is_unlimited_query
&&
max_size
&&
cur_size
>=
max_size
)
{
if
(
!
settings
.
queue_max_wait_ms
.
totalMilliseconds
()
||
!
have_space
.
tryWait
(
mutex
,
settings
.
queue_max_wait_ms
.
totalMilliseconds
()))
{
throw
Exception
(
"Too many simultaneous queries. Maximum: "
+
toString
(
max_size
),
ErrorCodes
::
TOO_MANY_SIMULTANEOUS_QUERIES
);
}
}
/** Why we use current user?
* Because initial one is passed by client and credentials for it is not verified,
...
...
@@ -50,7 +102,7 @@ ProcessList::EntryPtr ProcessList::insert(
if
(
user_process_list
!=
user_to_queries
.
end
())
{
if
(
!
is_
kill
_query
&&
settings
.
max_concurrent_queries_for_user
if
(
!
is_
unlimited
_query
&&
settings
.
max_concurrent_queries_for_user
&&
user_process_list
->
second
.
queries
.
size
()
>=
settings
.
max_concurrent_queries_for_user
)
throw
Exception
(
"Too many simultaneous queries for user "
+
client_info
.
current_user
+
". Current: "
+
toString
(
user_process_list
->
second
.
queries
.
size
())
...
...
@@ -191,31 +243,37 @@ void ProcessListElement::setQueryStreams(const BlockIO & io)
query_stream_in
=
io
.
in
;
query_stream_out
=
io
.
out
;
query_streams_
initialized
=
true
;
query_streams_
status
=
QueryStreamsStatus
::
Initialized
;
}
void
ProcessListElement
::
releaseQueryStreams
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
BlockInputStreamPtr
in
;
BlockOutputStreamPtr
out
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
query_streams_status
=
QueryStreamsStatus
::
Released
;
in
=
std
::
move
(
query_stream_in
);
out
=
std
::
move
(
query_stream_out
);
}
query_streams_initialized
=
false
;
query_streams_released
=
true
;
query_stream_in
.
reset
();
query_stream_out
.
reset
();
/// Destroy streams outside the mutex lock
}
bool
ProcessListElement
::
streamsAreReleased
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
return
query_streams_
r
eleased
;
return
query_streams_
status
==
QueryStreamsStatus
::
R
eleased
;
}
bool
ProcessListElement
::
tryGetQueryStreams
(
BlockInputStreamPtr
&
in
,
BlockOutputStreamPtr
&
out
)
const
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
query_streams_mutex
);
if
(
!
query_streams_i
nitialized
)
if
(
query_streams_status
!=
QueryStreamsStatus
::
I
nitialized
)
return
false
;
in
=
query_stream_in
;
...
...
dbms/src/Interpreters/ProcessList.h
浏览文件 @
089e71d9
...
...
@@ -91,8 +91,14 @@ private:
BlockInputStreamPtr
query_stream_in
;
BlockOutputStreamPtr
query_stream_out
;
bool
query_streams_initialized
{
false
};
bool
query_streams_released
{
false
};
enum
QueryStreamsStatus
{
NotInitialized
,
Initialized
,
Released
};
QueryStreamsStatus
query_streams_status
{
NotInitialized
};
public:
ProcessListElement
(
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp
0 → 100644
浏览文件 @
089e71d9
#include "ReplicatedMergeTreeAddress.h"
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace
DB
{
void
ReplicatedMergeTreeAddress
::
writeText
(
WriteBuffer
&
out
)
const
{
out
<<
"host: "
<<
escape
<<
host
<<
'\n'
<<
"port: "
<<
replication_port
<<
'\n'
<<
"tcp_port: "
<<
queries_port
<<
'\n'
<<
"database: "
<<
escape
<<
database
<<
'\n'
<<
"table: "
<<
escape
<<
table
<<
'\n'
;
}
void
ReplicatedMergeTreeAddress
::
readText
(
ReadBuffer
&
in
)
{
in
>>
"host: "
>>
escape
>>
host
>>
"
\n
"
>>
"port: "
>>
replication_port
>>
"
\n
"
>>
"tcp_port: "
>>
queries_port
>>
"
\n
"
>>
"database: "
>>
escape
>>
database
>>
"
\n
"
>>
"table: "
>>
escape
>>
table
>>
"
\n
"
;
}
String
ReplicatedMergeTreeAddress
::
toString
()
const
{
WriteBufferFromOwnString
out
;
writeText
(
out
);
return
out
.
str
();
}
void
ReplicatedMergeTreeAddress
::
fromString
(
const
String
&
str
)
{
ReadBufferFromString
in
(
str
);
readText
(
in
);
}
}
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h
浏览文件 @
089e71d9
#pragma once
#include <Core/Types.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace
DB
...
...
@@ -18,44 +17,19 @@ struct ReplicatedMergeTreeAddress
String
database
;
String
table
;
ReplicatedMergeTreeAddress
()
{}
ReplicatedMergeTreeAddress
(
const
String
&
str
)
ReplicatedMergeTreeAddress
()
=
default
;
explicit
ReplicatedMergeTreeAddress
(
const
String
&
str
)
{
fromString
(
str
);
}
void
writeText
(
WriteBuffer
&
out
)
const
{
out
<<
"host: "
<<
escape
<<
host
<<
'\n'
<<
"port: "
<<
replication_port
<<
'\n'
<<
"tcp_port: "
<<
queries_port
<<
'\n'
<<
"database: "
<<
escape
<<
database
<<
'\n'
<<
"table: "
<<
escape
<<
table
<<
'\n'
;
}
void
writeText
(
WriteBuffer
&
out
)
const
;
void
readText
(
ReadBuffer
&
in
)
{
in
>>
"host: "
>>
escape
>>
host
>>
"
\n
"
>>
"port: "
>>
replication_port
>>
"
\n
"
>>
"tcp_port: "
>>
queries_port
>>
"
\n
"
>>
"database: "
>>
escape
>>
database
>>
"
\n
"
>>
"table: "
>>
escape
>>
table
>>
"
\n
"
;
}
void
readText
(
ReadBuffer
&
in
);
String
toString
()
const
{
WriteBufferFromOwnString
out
;
writeText
(
out
);
return
out
.
str
();
}
String
toString
()
const
;
void
fromString
(
const
String
&
str
)
{
ReadBufferFromString
in
(
str
);
readText
(
in
);
}
void
fromString
(
const
String
&
str
);
};
}
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
浏览文件 @
089e71d9
...
...
@@ -292,16 +292,10 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
void
ReplicatedMergeTreeRestartingThread
::
activateReplica
()
{
auto
host_port
=
storage
.
context
.
getInterserverIOAddress
();
auto
zookeeper
=
storage
.
getZooKeeper
();
/// How other replicas can access this.
ReplicatedMergeTreeAddress
address
;
address
.
host
=
host_port
.
first
;
address
.
replication_port
=
host_port
.
second
;
address
.
queries_port
=
storage
.
context
.
getTCPPort
();
address
.
database
=
storage
.
database_name
;
address
.
table
=
storage
.
table_name
;
/// How other replicas can access this one.
ReplicatedMergeTreeAddress
address
=
storage
.
getReplicatedMergeTreeAddress
();
String
is_active_path
=
storage
.
replica_path
+
"/is_active"
;
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
089e71d9
...
...
@@ -3004,6 +3004,10 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str
table_name
=
new_table_name
;
full_path
=
new_full_path
;
/// Update table name in zookeeper
auto
zookeeper
=
getZooKeeper
();
zookeeper
->
set
(
replica_path
+
"/host"
,
getReplicatedMergeTreeAddress
().
toString
());
/// TODO: You can update names of loggers.
}
...
...
@@ -3766,4 +3770,17 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
LOG_TRACE
(
log
,
"Deleted "
<<
to_delete_futures
.
size
()
<<
" deduplication block IDs in partition ID "
<<
partition_id
);
}
ReplicatedMergeTreeAddress
StorageReplicatedMergeTree
::
getReplicatedMergeTreeAddress
()
const
{
auto
host_port
=
context
.
getInterserverIOAddress
();
ReplicatedMergeTreeAddress
res
;
res
.
host
=
host_port
.
first
;
res
.
replication_port
=
host_port
.
second
;
res
.
queries_port
=
context
.
getTCPPort
();
res
.
database
=
database_name
;
res
.
table
=
table_name
;
return
res
;
}
}
dbms/src/Storages/StorageReplicatedMergeTree.h
浏览文件 @
089e71d9
...
...
@@ -17,6 +17,7 @@
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
...
...
@@ -451,6 +452,9 @@ private:
void
clearBlocksInPartition
(
zkutil
::
ZooKeeper
&
zookeeper
,
const
String
&
partition_id
,
Int64
min_block_num
,
Int64
max_block_num
);
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress
getReplicatedMergeTreeAddress
()
const
;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
...
...
dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml
0 → 100644
浏览文件 @
089e71d9
<yandex>
<users>
<restricted_user>
<password></password>
<profile>
default
</profile>
<quota>
default
</quota>
<networks>
<ip>
::/0
</ip>
</networks>
<allow_databases>
<database>
db1
</database>
</allow_databases>
</restricted_user>
</users>
</yandex>
dbms/tests/integration/test_distributed_ddl/test.py
浏览文件 @
089e71d9
...
...
@@ -315,6 +315,24 @@ def test_macro(started_cluster):
ddl_check_query
(
instance
,
"DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'"
)
ddl_check_query
(
instance
,
"DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'"
)
def
test_allowed_databases
(
started_cluster
):
instance
=
cluster
.
instances
[
'ch2'
]
instance
.
query
(
"CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster"
)
instance
.
query
(
"CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster"
)
instance
.
query
(
"CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory"
,
settings
=
{
"user"
:
"restricted_user"
})
with
pytest
.
raises
(
Exception
):
instance
.
query
(
"CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory"
,
settings
=
{
"user"
:
"restricted_user"
})
with
pytest
.
raises
(
Exception
):
instance
.
query
(
"CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory"
,
settings
=
{
"user"
:
"restricted_user"
})
with
pytest
.
raises
(
Exception
):
instance
.
query
(
"DROP DATABASE db2 ON CLUSTER cluster"
,
settings
=
{
"user"
:
"restricted_user"
})
instance
.
query
(
"DROP DATABASE db1 ON CLUSTER cluster"
,
settings
=
{
"user"
:
"restricted_user"
})
if
__name__
==
'__main__'
:
with
contextmanager
(
started_cluster
)()
as
cluster
:
for
name
,
instance
in
cluster
.
instances
.
items
():
...
...
dbms/tests/queries/0_stateless/00446_clear_column_in_partition_zookeeper.sql
浏览文件 @
089e71d9
...
...
@@ -61,11 +61,3 @@ ALTER TABLE test.clear_column1 CLEAR COLUMN s IN PARTITION '200002';
ALTER
TABLE
test
.
clear_column1
CLEAR
COLUMN
s
IN
PARTITION
'200012'
,
CLEAR
COLUMN
i
IN
PARTITION
'200012'
;
-- Drop empty partition also Ok
ALTER
TABLE
test
.
clear_column1
DROP
PARTITION
'200012'
,
DROP
PARTITION
'200011'
;
-- check optimize for non-leader replica (it is not related with CLEAR COLUMN)
OPTIMIZE
TABLE
test
.
clear_column1
;
OPTIMIZE
TABLE
test
.
clear_column2
;
DROP
TABLE
IF
EXISTS
test
.
clear_column1
;
DROP
TABLE
IF
EXISTS
test
.
clear_column2
;
dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.reference
0 → 100644
浏览文件 @
089e71d9
0 1 1
0 1 2
dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql
0 → 100644
浏览文件 @
089e71d9
DROP
TABLE
IF
EXISTS
test
.
clear_column1
;
DROP
TABLE
IF
EXISTS
test
.
clear_column2
;
CREATE
TABLE
test
.
clear_column1
(
p
Int64
,
i
Int64
,
v
UInt64
)
ENGINE
=
ReplicatedReplacingMergeTree
(
'/clickhouse/tables/test/clear_column'
,
'1'
,
v
)
PARTITION
BY
p
ORDER
BY
i
;
CREATE
TABLE
test
.
clear_column2
(
p
Int64
,
i
Int64
,
v
UInt64
)
ENGINE
=
ReplicatedReplacingMergeTree
(
'/clickhouse/tables/test/clear_column'
,
'2'
,
v
)
PARTITION
BY
p
ORDER
BY
i
;
INSERT
INTO
test
.
clear_column1
VALUES
(
0
,
1
,
0
);
INSERT
INTO
test
.
clear_column1
VALUES
(
0
,
1
,
1
);
OPTIMIZE
TABLE
test
.
clear_column1
;
OPTIMIZE
TABLE
test
.
clear_column2
;
SELECT
*
FROM
test
.
clear_column1
;
RENAME
TABLE
test
.
clear_column2
TO
test
.
clear_column3
;
INSERT
INTO
test
.
clear_column1
VALUES
(
0
,
1
,
2
);
OPTIMIZE
TABLE
test
.
clear_column3
;
SELECT
*
FROM
test
.
clear_column1
;
DROP
TABLE
IF
EXISTS
test
.
clear_column1
;
DROP
TABLE
IF
EXISTS
test
.
clear_column2
;
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录