Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
6613e567
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
6613e567
编写于
4月 21, 2018
作者:
Z
zhang2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ISSUES-2259 support truncate syntax
上级
de15c44f
变更
51
隐藏空白更改
内联
并排
Showing
51 changed file
with
821 addition
and
219 deletion
+821
-219
dbms/src/Common/ZooKeeper/Types.h
dbms/src/Common/ZooKeeper/Types.h
+3
-0
dbms/src/Databases/DatabaseDictionary.cpp
dbms/src/Databases/DatabaseDictionary.cpp
+0
-5
dbms/src/Databases/DatabaseDictionary.h
dbms/src/Databases/DatabaseDictionary.h
+0
-1
dbms/src/Databases/DatabaseMemory.cpp
dbms/src/Databases/DatabaseMemory.cpp
+0
-5
dbms/src/Databases/DatabaseMemory.h
dbms/src/Databases/DatabaseMemory.h
+0
-2
dbms/src/Databases/DatabaseOrdinary.cpp
dbms/src/Databases/DatabaseOrdinary.cpp
+0
-6
dbms/src/Databases/DatabaseOrdinary.h
dbms/src/Databases/DatabaseOrdinary.h
+0
-1
dbms/src/Databases/IDatabase.h
dbms/src/Databases/IDatabase.h
+7
-1
dbms/src/Interpreters/ClusterProxy/DescribeStreamFactory.cpp
dbms/src/Interpreters/ClusterProxy/DescribeStreamFactory.cpp
+1
-1
dbms/src/Interpreters/ClusterProxy/DescribeStreamFactory.h
dbms/src/Interpreters/ClusterProxy/DescribeStreamFactory.h
+1
-1
dbms/src/Interpreters/ClusterProxy/IStreamFactory.h
dbms/src/Interpreters/ClusterProxy/IStreamFactory.h
+2
-2
dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
+1
-1
dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h
dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h
+1
-1
dbms/src/Interpreters/ClusterProxy/TruncateStreamFactory.cpp
dbms/src/Interpreters/ClusterProxy/TruncateStreamFactory.cpp
+91
-0
dbms/src/Interpreters/ClusterProxy/TruncateStreamFactory.h
dbms/src/Interpreters/ClusterProxy/TruncateStreamFactory.h
+34
-0
dbms/src/Interpreters/ClusterProxy/executeQuery.cpp
dbms/src/Interpreters/ClusterProxy/executeQuery.cpp
+1
-1
dbms/src/Interpreters/InterpreterDropQuery.cpp
dbms/src/Interpreters/InterpreterDropQuery.cpp
+129
-111
dbms/src/Interpreters/InterpreterDropQuery.h
dbms/src/Interpreters/InterpreterDropQuery.h
+13
-1
dbms/src/Parsers/ASTDropQuery.h
dbms/src/Parsers/ASTDropQuery.h
+43
-11
dbms/src/Parsers/ParserDropQuery.cpp
dbms/src/Parsers/ParserDropQuery.cpp
+37
-11
dbms/src/Parsers/ParserDropQuery.h
dbms/src/Parsers/ParserDropQuery.h
+5
-1
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
+24
-7
dbms/src/Storages/Distributed/DirectoryMonitor.h
dbms/src/Storages/Distributed/DirectoryMonitor.h
+1
-0
dbms/src/Storages/IStorage.h
dbms/src/Storages/IStorage.h
+8
-0
dbms/src/Storages/MergeTree/MergeTreeData.cpp
dbms/src/Storages/MergeTree/MergeTreeData.cpp
+11
-1
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
...c/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
+2
-2
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
+2
-2
dbms/src/Storages/StorageDistributed.cpp
dbms/src/Storages/StorageDistributed.cpp
+31
-0
dbms/src/Storages/StorageDistributed.h
dbms/src/Storages/StorageDistributed.h
+5
-0
dbms/src/Storages/StorageLog.cpp
dbms/src/Storages/StorageLog.cpp
+24
-2
dbms/src/Storages/StorageLog.h
dbms/src/Storages/StorageLog.h
+2
-0
dbms/src/Storages/StorageMaterializedView.cpp
dbms/src/Storages/StorageMaterializedView.cpp
+15
-0
dbms/src/Storages/StorageMaterializedView.h
dbms/src/Storages/StorageMaterializedView.h
+2
-0
dbms/src/Storages/StorageMemory.cpp
dbms/src/Storages/StorageMemory.cpp
+6
-0
dbms/src/Storages/StorageMemory.h
dbms/src/Storages/StorageMemory.h
+3
-0
dbms/src/Storages/StorageMergeTree.cpp
dbms/src/Storages/StorageMergeTree.cpp
+13
-2
dbms/src/Storages/StorageMergeTree.h
dbms/src/Storages/StorageMergeTree.h
+2
-0
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+75
-39
dbms/src/Storages/StorageReplicatedMergeTree.h
dbms/src/Storages/StorageReplicatedMergeTree.h
+6
-0
dbms/src/Storages/StorageSet.cpp
dbms/src/Storages/StorageSet.cpp
+17
-1
dbms/src/Storages/StorageSet.h
dbms/src/Storages/StorageSet.h
+2
-0
dbms/src/Storages/StorageStripeLog.cpp
dbms/src/Storages/StorageStripeLog.cpp
+14
-0
dbms/src/Storages/StorageStripeLog.h
dbms/src/Storages/StorageStripeLog.h
+2
-0
dbms/src/Storages/StorageTinyLog.cpp
dbms/src/Storages/StorageTinyLog.cpp
+15
-0
dbms/src/Storages/StorageTinyLog.h
dbms/src/Storages/StorageTinyLog.h
+2
-0
dbms/tests/queries/0_stateless/00623_replicated_truncate_table.reference
...ies/0_stateless/00623_replicated_truncate_table.reference
+7
-0
dbms/tests/queries/0_stateless/00623_replicated_truncate_table.sql
...s/queries/0_stateless/00623_replicated_truncate_table.sql
+24
-0
dbms/tests/queries/0_stateless/00623_truncate_table.reference
.../tests/queries/0_stateless/00623_truncate_table.reference
+20
-0
dbms/tests/queries/0_stateless/00623_truncate_table.sql
dbms/tests/queries/0_stateless/00623_truncate_table.sql
+88
-0
dbms/tests/queries/0_stateless/00623_truncate_table_throw_exception.reference
..._stateless/00623_truncate_table_throw_exception.reference
+6
-0
dbms/tests/queries/0_stateless/00623_truncate_table_throw_exception.sh
...eries/0_stateless/00623_truncate_table_throw_exception.sh
+23
-0
未找到文件。
dbms/src/Common/ZooKeeper/Types.h
浏览文件 @
6613e567
...
...
@@ -54,6 +54,9 @@ using SetResponse = ZooKeeperImpl::ZooKeeper::SetResponse;
using
ListResponse
=
ZooKeeperImpl
::
ZooKeeper
::
ListResponse
;
using
CheckResponse
=
ZooKeeperImpl
::
ZooKeeper
::
CheckResponse
;
template
<
typename
R
>
using
MultiAsyncResponse
=
std
::
vector
<
std
::
pair
<
std
::
string
,
std
::
future
<
R
>>>
;
RequestPtr
makeCreateRequest
(
const
std
::
string
&
path
,
const
std
::
string
&
data
,
int
create_mode
);
RequestPtr
makeRemoveRequest
(
const
std
::
string
&
path
,
int
version
);
RequestPtr
makeSetRequest
(
const
std
::
string
&
path
,
const
std
::
string
&
data
,
int
version
);
...
...
dbms/src/Databases/DatabaseDictionary.cpp
浏览文件 @
6613e567
...
...
@@ -213,9 +213,4 @@ void DatabaseDictionary::shutdown()
{
}
void
DatabaseDictionary
::
drop
()
{
/// Additional actions to delete database are not required.
}
}
dbms/src/Databases/DatabaseDictionary.h
浏览文件 @
6613e567
...
...
@@ -87,7 +87,6 @@ public:
ASTPtr
getCreateDatabaseQuery
(
const
Context
&
context
)
const
override
;
void
shutdown
()
override
;
void
drop
()
override
;
private:
const
String
name
;
...
...
dbms/src/Databases/DatabaseMemory.cpp
浏览文件 @
6613e567
...
...
@@ -78,9 +78,4 @@ ASTPtr DatabaseMemory::getCreateDatabaseQuery(
throw
Exception
(
"There is no CREATE DATABASE query for DatabaseMemory"
,
ErrorCodes
::
CANNOT_GET_CREATE_TABLE_QUERY
);
}
void
DatabaseMemory
::
drop
()
{
/// Additional actions to delete database are not required.
}
}
dbms/src/Databases/DatabaseMemory.h
浏览文件 @
6613e567
...
...
@@ -57,8 +57,6 @@ public:
ASTPtr
getCreateDatabaseQuery
(
const
Context
&
context
)
const
override
;
void
drop
()
override
;
private:
Poco
::
Logger
*
log
;
};
...
...
dbms/src/Databases/DatabaseOrdinary.cpp
浏览文件 @
6613e567
...
...
@@ -507,12 +507,6 @@ void DatabaseOrdinary::shutdown()
tables
.
clear
();
}
void
DatabaseOrdinary
::
drop
()
{
/// No additional removal actions are required.
}
void
DatabaseOrdinary
::
alterTable
(
const
Context
&
context
,
const
String
&
name
,
...
...
dbms/src/Databases/DatabaseOrdinary.h
浏览文件 @
6613e567
...
...
@@ -63,7 +63,6 @@ public:
String
getTableMetadataPath
(
const
String
&
table_name
)
const
override
;
void
shutdown
()
override
;
void
drop
()
override
;
private:
const
String
metadata_path
;
...
...
dbms/src/Databases/IDatabase.h
浏览文件 @
6613e567
...
...
@@ -6,6 +6,7 @@
#include <ctime>
#include <memory>
#include <functional>
#include <Poco/File.h>
class
ThreadPool
;
...
...
@@ -143,7 +144,12 @@ public:
virtual
void
shutdown
()
=
0
;
/// Delete metadata, the deletion of which differs from the recursive deletion of the directory, if any.
virtual
void
drop
()
=
0
;
virtual
void
drop
()
{
String
metadata_path
=
getMetadataPath
();
if
(
!
metadata_path
.
empty
())
Poco
::
File
(
metadata_path
).
remove
(
false
);
};
virtual
~
IDatabase
()
{}
};
...
...
dbms/src/Interpreters/ClusterProxy/DescribeStreamFactory.cpp
浏览文件 @
6613e567
...
...
@@ -29,7 +29,7 @@ namespace ClusterProxy
void
DescribeStreamFactory
::
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
Context
&
context
,
const
ThrottlerPtr
&
throttler
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
{
for
(
const
Cluster
::
Address
&
local_address
:
shard_info
.
local_addresses
)
...
...
dbms/src/Interpreters/ClusterProxy/DescribeStreamFactory.h
浏览文件 @
6613e567
...
...
@@ -14,7 +14,7 @@ public:
void
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
Context
&
context
,
const
ThrottlerPtr
&
throttler
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
override
;
};
...
...
dbms/src/Interpreters/ClusterProxy/IStreamFactory.h
浏览文件 @
6613e567
...
...
@@ -25,8 +25,8 @@ public:
virtual
void
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
Context
&
context
,
const
ThrottlerPtr
&
throttler
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
=
0
;
};
...
...
dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
浏览文件 @
6613e567
...
...
@@ -59,7 +59,7 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context &
void
SelectStreamFactory
::
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
Context
&
context
,
const
ThrottlerPtr
&
throttler
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
{
auto
emplace_local_stream
=
[
&
]()
...
...
dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.h
浏览文件 @
6613e567
...
...
@@ -22,7 +22,7 @@ public:
void
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
Context
&
context
,
const
ThrottlerPtr
&
throttler
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
override
;
private:
...
...
dbms/src/Interpreters/ClusterProxy/TruncateStreamFactory.cpp
0 → 100644
浏览文件 @
6613e567
#include <Interpreters/ClusterProxy/TruncateStreamFactory.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Interpreters/Cluster.h>
namespace
DB
{
namespace
ClusterProxy
{
TruncateStreamFactory
::
TruncateStreamFactory
(
ClusterPtr
&
cluster_
,
String
&
storage_path_
)
:
cluster
(
cluster_
),
storage_path
(
storage_path_
)
{}
void
TruncateStreamFactory
::
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
{
/// TODO remove temporary
// removeTemporaryDir(shard_info);
if
(
shard_info
.
isLocal
())
{
InterpreterDropQuery
drop_query
{
query_ast
,
context
};
BlockIO
drop_res
=
drop_query
.
execute
();
if
(
drop_res
.
in
)
res
.
emplace_back
(
std
::
move
(
drop_res
.
in
));
}
if
(
!
shard_info
.
hasInternalReplication
()
||
!
shard_info
.
isLocal
())
{
Cluster
::
Addresses
replicas
=
getShardReplicasAddresses
(
shard_info
);
for
(
size_t
replica_index
:
ext
::
range
(
0
,
replicas
.
size
()))
{
if
(
!
replicas
[
replica_index
].
is_local
)
{
if
(
const
auto
&
connection_pool
=
shard_info
.
per_replica_pools
.
at
(
replica_index
))
{
auto
entry
=
connection_pool
->
get
(
&
context
.
getSettingsRef
());
auto
remote_stream
=
std
::
make_shared
<
RemoteBlockInputStream
>
(
*
entry
,
query
,
Block
{},
context
,
nullptr
,
throttler
);
remote_stream
->
setPoolMode
(
PoolMode
::
GET_ONE
);
remote_stream
->
appendExtraInfo
();
res
.
emplace_back
(
std
::
move
(
remote_stream
));
if
(
shard_info
.
hasInternalReplication
())
break
;
}
throw
Exception
(
"Connection pool for replica "
+
replicas
[
replica_index
].
readableString
()
+
" does not exist"
,
ErrorCodes
::
LOGICAL_ERROR
);
}
}
}
}
void
TruncateStreamFactory
::
removeTemporaryDir
(
const
Cluster
::
ShardInfo
&
shard_info
)
const
{
if
(
!
shard_info
.
hasInternalReplication
())
{
Cluster
::
Addresses
addresses
=
cluster
->
getShardsAddresses
().
at
(
shard_info
.
shard_num
);
for
(
auto
&
address
:
addresses
)
{
auto
temporary_file
=
Poco
::
File
(
storage_path
+
"/"
+
address
.
toStringFull
());
if
(
temporary_file
.
exists
())
temporary_file
.
remove
(
true
);
}
return
;
}
if
(
!
shard_info
.
dir_name_for_internal_replication
.
empty
())
{
auto
temporary_file
=
Poco
::
File
(
storage_path
+
"/"
+
shard_info
.
dir_name_for_internal_replication
);
if
(
temporary_file
.
exists
())
temporary_file
.
remove
(
true
);
}
}
Cluster
::
Addresses
TruncateStreamFactory
::
getShardReplicasAddresses
(
const
Cluster
::
ShardInfo
&
info
)
{
const
auto
addresses_with_failovers
=
cluster
->
getShardsAddresses
();
return
addresses_with_failovers
[
info
.
shard_num
-
1
];
}
}
}
dbms/src/Interpreters/ClusterProxy/TruncateStreamFactory.h
0 → 100644
浏览文件 @
6613e567
#pragma once
#include <Interpreters/ClusterProxy/IStreamFactory.h>
namespace
DB
{
namespace
ClusterProxy
{
class
TruncateStreamFactory
final
:
public
IStreamFactory
{
public:
TruncateStreamFactory
(
ClusterPtr
&
cluster
,
String
&
storage_path
);
void
createForShard
(
const
Cluster
::
ShardInfo
&
shard_info
,
const
String
&
query
,
const
ASTPtr
&
query_ast
,
const
ThrottlerPtr
&
throttler
,
Context
&
context
,
BlockInputStreams
&
res
)
override
;
void
removeTemporaryDir
(
const
Cluster
::
ShardInfo
&
shard_info
)
const
;
private:
ClusterPtr
cluster
;
String
&
storage_path
;
Cluster
::
Addresses
getShardReplicasAddresses
(
const
Cluster
::
ShardInfo
&
info
);
};
}
}
dbms/src/Interpreters/ClusterProxy/executeQuery.cpp
浏览文件 @
6613e567
...
...
@@ -57,7 +57,7 @@ BlockInputStreams executeQuery(
throttler
=
user_level_throttler
;
for
(
const
auto
&
shard_info
:
cluster
->
getShardsInfo
())
stream_factory
.
createForShard
(
shard_info
,
query
,
query_ast
,
new_context
,
throttler
,
res
);
stream_factory
.
createForShard
(
shard_info
,
query
,
query_ast
,
throttler
,
new_context
,
res
);
return
res
;
}
...
...
dbms/src/Interpreters/InterpreterDropQuery.cpp
浏览文件 @
6613e567
...
...
@@ -8,6 +8,8 @@
#include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Functions/GatherUtils/Algorithms.h>
#include <AggregateFunctions/AggregateFunctionSequenceMatch.h>
namespace
DB
...
...
@@ -19,6 +21,8 @@ namespace ErrorCodes
extern
const
int
DATABASE_NOT_EMPTY
;
extern
const
int
UNKNOWN_DATABASE
;
extern
const
int
READONLY
;
extern
const
int
LOGICAL_ERROR
;
extern
const
int
UNKNOWN_TABLE
;
}
...
...
@@ -34,29 +38,90 @@ BlockIO InterpreterDropQuery::execute()
if
(
!
drop
.
cluster
.
empty
())
return
executeDDLQueryOnCluster
(
query_ptr
,
context
,
{
drop
.
database
});
String
path
=
context
.
getPath
();
String
current_database
=
context
.
getCurrentDatabase
();
if
(
!
drop
.
table
.
empty
())
return
executeToTable
(
drop
.
database
,
drop
.
table
,
drop
.
kind
,
drop
.
if_exists
,
drop
.
temporary
);
else
if
(
!
drop
.
database
.
empty
())
return
executeToDatabase
(
drop
.
database
,
drop
.
kind
,
drop
.
if_exists
);
else
throw
Exception
(
"Database and table names is empty."
,
ErrorCodes
::
LOGICAL_ERROR
);
}
bool
drop_database
=
drop
.
table
.
empty
()
&&
!
drop
.
database
.
empty
();
if
(
drop_database
&&
drop
.
detach
)
BlockIO
InterpreterDropQuery
::
executeToTable
(
String
&
database_name_
,
String
&
table_name
,
ASTDropQuery
::
Kind
kind
,
bool
if_exists
,
bool
if_temporary
)
{
if
(
if_temporary
||
database_name_
.
empty
())
{
auto
database
=
context
.
detachDatabase
(
drop
.
database
);
database
->
shutdown
();
return
{};
auto
&
session_context
=
context
.
hasSessionContext
()
?
context
.
getSessionContext
()
:
context
;
if
(
session_context
.
isExternalTableExist
(
table_name
))
return
executeToTemporaryTable
(
table_name
,
kind
);
}
/// Drop temporary table.
if
(
drop
.
database
.
empty
()
||
drop
.
temporary
)
String
database_name
=
database_name_
.
empty
()
?
context
.
getCurrentDatabase
()
:
database_name_
;
DatabaseAndTable
database_and_table
=
tryGetDatabaseAndTable
(
database_name
,
table_name
,
if_exists
);
if
(
database_and_table
.
first
&&
database_and_table
.
second
)
{
StoragePtr
table
=
(
context
.
hasSessionContext
()
?
context
.
getSessionContext
()
:
context
).
tryRemoveExternalTable
(
drop
.
table
);
if
(
table
)
auto
ddl_guard
=
context
.
getDDLGuard
(
database_name
,
table_name
,
"Table "
+
database_name
+
"."
+
table_name
+
" is dropping or detaching right now"
);
if
(
kind
==
ASTDropQuery
::
Kind
::
Detach
)
{
if
(
drop
.
database
.
empty
()
&&
!
drop
.
temporary
)
database_and_table
.
second
->
shutdown
();
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
database_and_table
.
second
->
lockDataForAlter
(
__PRETTY_FUNCTION__
);
/// Drop table from memory, don't touch data and metadata
database_and_table
.
first
->
detachTable
(
database_and_table
.
second
->
getTableName
());
}
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Truncate
)
{
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
database_and_table
.
second
->
lockDataForAlter
(
__PRETTY_FUNCTION__
);
/// Drop table data, don't touch metadata
database_and_table
.
second
->
truncate
(
query_ptr
);
}
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Drop
)
{
if
(
!
database_and_table
.
second
->
checkTableCanBeDropped
())
throw
Exception
(
"Table "
+
database_name
+
"."
+
database_and_table
.
second
->
getTableName
()
+
" couldn't be dropped due to failed pre-drop check"
,
ErrorCodes
::
TABLE_WAS_NOT_DROPPED
);
database_and_table
.
second
->
shutdown
();
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
database_and_table
.
second
->
lockDataForAlter
(
__PRETTY_FUNCTION__
);
/// Delete table metdata and table itself from memory
database_and_table
.
first
->
removeTable
(
context
,
database_and_table
.
second
->
getTableName
());
/// Delete table data
database_and_table
.
second
->
drop
();
database_and_table
.
second
->
is_dropped
=
true
;
String
database_data_path
=
database_and_table
.
first
->
getDataPath
();
/// If it is not virtual database like Dictionary then drop remaining data dir
if
(
!
database_data_path
.
empty
())
{
LOG_WARNING
((
&
Logger
::
get
(
"InterpreterDropQuery"
)),
"It is recommended to use `DROP TEMPORARY TABLE` to delete temporary tables"
);
String
table_data_path
=
database_data_path
+
"/"
+
escapeForFileName
(
database_and_table
.
second
->
getTableName
());
if
(
Poco
::
File
(
table_data_path
).
exists
())
Poco
::
File
(
table_data_path
).
remove
(
true
);
}
}
}
return
{};
}
BlockIO
InterpreterDropQuery
::
executeToTemporaryTable
(
String
&
table_name
,
ASTDropQuery
::
Kind
kind
)
{
if
(
kind
==
ASTDropQuery
::
Kind
::
Detach
)
throw
Exception
(
"Unable to detach temporary table."
,
ErrorCodes
::
SYNTAX_ERROR
);
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Drop
)
{
StoragePtr
table
=
(
context
.
hasSessionContext
()
?
context
.
getSessionContext
()
:
context
).
tryRemoveExternalTable
(
table_name
);
if
(
table
)
{
table
->
shutdown
();
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
table
->
lockForAlter
(
__PRETTY_FUNCTION__
);
...
...
@@ -67,129 +132,82 @@ BlockIO InterpreterDropQuery::execute()
}
}
String
database_name
=
drop
.
database
.
empty
()
?
current_database
:
drop
.
database
;
String
database_name_escaped
=
escapeForFileName
(
database_name
);
String
metadata_path
=
path
+
"metadata/"
+
database_name_escaped
+
"/"
;
String
database_metadata_path
=
path
+
"metadata/"
+
database_name_escaped
+
".sql"
;
auto
database
=
context
.
tryGetDatabase
(
database_name
);
if
(
!
database
&&
!
drop
.
if_exists
)
throw
Exception
(
"Database "
+
database_name
+
" doesn't exist"
,
ErrorCodes
::
UNKNOWN_DATABASE
);
std
::
vector
<
std
::
pair
<
StoragePtr
,
std
::
unique_ptr
<
DDLGuard
>>>
tables_to_drop
;
if
(
!
drop_database
)
{
StoragePtr
table
;
if
(
drop
.
if_exists
)
table
=
context
.
tryGetTable
(
database_name
,
drop
.
table
);
else
table
=
context
.
getTable
(
database_name
,
drop
.
table
);
return
{};
}
if
(
table
)
tables_to_drop
.
emplace_back
(
table
,
context
.
getDDLGuard
(
database_name
,
drop
.
table
,
"Table "
+
database_name
+
"."
+
drop
.
table
+
" is dropping or detaching right now"
));
else
return
{};
}
else
BlockIO
InterpreterDropQuery
::
executeToDatabase
(
String
&
database_name
,
ASTDropQuery
::
Kind
kind
,
bool
if_exists
)
{
if
(
auto
database
=
tryGetDatabase
(
database_name
,
if_exists
))
{
if
(
!
databas
e
)
if
(
kind
==
ASTDropQuery
::
Kind
::
Truncat
e
)
{
if
(
!
drop
.
if_exists
)
throw
Exception
(
"Database "
+
database_name
+
" doesn't exist"
,
ErrorCodes
::
UNKNOWN_DATABASE
);
return
{};
throw
Exception
(
"Unable to truncate database."
,
ErrorCodes
::
SYNTAX_ERROR
);
}
for
(
auto
iterator
=
database
->
getIterator
(
context
);
iterator
->
isValid
();
iterator
->
next
())
tables_to_drop
.
emplace_back
(
iterator
->
table
(),
context
.
getDDLGuard
(
database_name
,
iterator
->
name
(),
"Table "
+
database_name
+
"."
+
iterator
->
name
()
+
" is dropping or detaching right now"
));
}
for
(
auto
&
table
:
tables_to_drop
)
{
if
(
!
drop
.
detach
)
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Detach
)
{
if
(
!
table
.
first
->
checkTableCanBeDropped
())
throw
Exception
(
"Table "
+
database_name
+
"."
+
table
.
first
->
getTableName
()
+
" couldn't be dropped due to failed pre-drop check"
,
ErrorCodes
::
TABLE_WAS_NOT_DROPPED
);
context
.
detachDatabase
(
database_name
);
database
->
shutdown
();
}
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Drop
)
{
for
(
auto
iterator
=
database
->
getIterator
(
context
);
iterator
->
isValid
();
iterator
->
next
())
{
String
current_table_name
=
iterator
->
table
()
->
getTableName
();
executeToTable
(
database_name
,
current_table_name
,
kind
,
false
,
false
);
}
table
.
first
->
shutdown
();
auto
context_lock
=
context
.
getLock
();
/// If table was already dropped by anyone, an exception will be thrown
auto
table_lock
=
table
.
first
->
lockForAlter
(
__PRETTY_FUNCTION__
);
/// Someone could have time to delete the database before us.
context
.
assertDatabaseExists
(
database_name
);
String
current_table_name
=
table
.
first
->
getTableName
();
/// Someone could have time to create a table in the database to be deleted while we deleted the tables without the context lock.
if
(
!
context
.
getDatabase
(
database_name
)
->
empty
(
context
))
throw
Exception
(
"New table appeared in database being dropped. Try dropping it again."
,
ErrorCodes
::
DATABASE_NOT_EMPTY
);
if
(
drop
.
detach
)
{
/// Drop table from memory, don't touch data and metadata
database
->
detachTable
(
current_table_name
);
}
else
{
/// Delete table metdata and table itself from memory
database
->
removeTable
(
context
,
current_table_name
);
/// Delete table data
table
.
first
->
drop
();
/// Delete database information from the RAM
context
.
detachDatabase
(
database_name
);
table
.
first
->
is_dropped
=
true
;
database
->
shutdown
();
/// Delete the database.
database
->
drop
();
/// Remove data directory if it is not virtual database. TODO: should IDatabase::drop() do that?
String
database_data_path
=
database
->
getDataPath
();
/// If it is not virtual database like Dictionary then drop remaining data dir
if
(
!
database_data_path
.
empty
())
{
String
table_data_path
=
database_data_path
+
"/"
+
escapeForFileName
(
current_table_name
);
Poco
::
File
(
database_data_path
).
remove
(
false
);
if
(
Poco
::
File
(
table_data_path
).
exists
())
Poco
::
File
(
table_data_path
).
remove
(
true
);
}
/// Old ClickHouse versions did not store database.sql files
Poco
::
File
database_metadata_file
(
context
.
getPath
()
+
"metadata/"
+
escapeForFileName
(
database_name
)
+
".sql"
);
if
(
database_metadata_file
.
exists
())
database_metadata_file
.
remove
(
false
);
}
}
if
(
drop_database
)
{
/// Delete the database. The tables in it have already been deleted.
auto
lock
=
context
.
getLock
();
/// Someone could have time to delete the database before us.
context
.
assertDatabaseExists
(
database_name
);
/// Someone could have time to create a table in the database to be deleted while we deleted the tables without the context lock.
if
(
!
context
.
getDatabase
(
database_name
)
->
empty
(
context
))
throw
Exception
(
"New table appeared in database being dropped. Try dropping it again."
,
ErrorCodes
::
DATABASE_NOT_EMPTY
);
/// Delete database information from the RAM
auto
database
=
context
.
detachDatabase
(
database_name
);
return
{};
}
/// Delete the database.
database
->
drop
();
DatabasePtr
InterpreterDropQuery
::
tryGetDatabase
(
String
&
database_name
,
bool
if_exists
)
{
return
if_exists
?
context
.
tryGetDatabase
(
database_name
)
:
context
.
getDatabase
(
database_name
);
}
/// Remove data directory if it is not virtual database. TODO: should IDatabase::drop() do that?
String
database_data_path
=
database
->
getDataPath
();
if
(
!
database_data_path
.
empty
())
Poco
::
File
(
database_data_path
).
remove
(
false
);
DatabaseAndTable
InterpreterDropQuery
::
tryGetDatabaseAndTable
(
String
&
database_name
,
String
&
table_name
,
bool
if_exists
)
{
DatabasePtr
database
=
tryGetDatabase
(
database_name
,
if_exists
);
Poco
::
File
(
metadata_path
).
remove
(
false
);
if
(
database
)
{
StoragePtr
table
=
database
->
tryGetTable
(
context
,
table_name
);
if
(
!
table
&&
!
if_exists
)
throw
Exception
(
"Table "
+
backQuoteIfNeed
(
database_name
)
+
"."
+
backQuoteIfNeed
(
table_name
)
+
" doesn't exist."
,
ErrorCodes
::
UNKNOWN_TABLE
);
/// Old ClickHouse versions did not store database.sql files
Poco
::
File
database_metadata_file
(
database_metadata_path
);
if
(
database_metadata_file
.
exists
())
database_metadata_file
.
remove
(
false
);
return
std
::
make_pair
<
DatabasePtr
,
StoragePtr
>
(
std
::
move
(
database
),
std
::
move
(
table
));
}
return
{};
}
void
InterpreterDropQuery
::
checkAccess
(
const
ASTDropQuery
&
drop
)
{
const
Settings
&
settings
=
context
.
getSettingsRef
();
...
...
dbms/src/Interpreters/InterpreterDropQuery.h
浏览文件 @
6613e567
...
...
@@ -2,6 +2,8 @@
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTDropQuery.h>
#include <Databases/IDatabase.h>
namespace
DB
...
...
@@ -9,7 +11,7 @@ namespace DB
class
Context
;
class
IAST
;
using
ASTPtr
=
std
::
shared_ptr
<
IAST
>
;
using
DatabaseAndTable
=
std
::
pair
<
DatabasePtr
,
StoragePtr
>
;
/** Allow to either drop table with all its data (DROP), or remove information about table (just forget) from server (DETACH).
*/
...
...
@@ -25,5 +27,15 @@ private:
void
checkAccess
(
const
ASTDropQuery
&
drop
);
ASTPtr
query_ptr
;
Context
&
context
;
BlockIO
executeToDatabase
(
String
&
database_name
,
ASTDropQuery
::
Kind
kind
,
bool
if_exists
);
BlockIO
executeToTable
(
String
&
database_name
,
String
&
table_name
,
ASTDropQuery
::
Kind
kind
,
bool
if_exists
,
bool
if_temporary
);
DatabasePtr
tryGetDatabase
(
String
&
database_name
,
bool
exists
);
DatabaseAndTable
tryGetDatabaseAndTable
(
String
&
database_name
,
String
&
table_name
,
bool
if_exists
);
BlockIO
executeToTemporaryTable
(
String
&
table_name
,
ASTDropQuery
::
Kind
kind
);
};
}
dbms/src/Parsers/ASTDropQuery.h
浏览文件 @
6613e567
...
...
@@ -3,6 +3,7 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <AggregateFunctions/AggregateFunctionSequenceMatch.h>
namespace
DB
{
...
...
@@ -13,14 +14,31 @@ namespace DB
class
ASTDropQuery
:
public
ASTQueryWithOutput
,
public
ASTQueryWithOnCluster
{
public:
bool
detach
{
false
};
/// DETACH query, not DROP.
enum
Kind
{
Drop
,
Detach
,
Truncate
,
};
Kind
kind
;
bool
if_exists
{
false
};
bool
temporary
{
false
};
String
database
;
String
table
;
/** Get the text that identifies this element. */
String
getID
()
const
override
{
return
(
detach
?
"DetachQuery_"
:
"DropQuery_"
)
+
database
+
"_"
+
table
;
};
String
getID
()
const
override
{
if
(
kind
==
ASTDropQuery
::
Kind
::
Drop
)
return
"DropQuery_"
+
database
+
"_"
+
table
;
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Detach
)
return
"DetachQuery_"
+
database
+
"_"
+
table
;
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Truncate
)
return
"TruncateQuery_"
+
database
+
"_"
+
table
;
else
throw
Exception
(
"Not supported kind of drop query."
,
ErrorCodes
::
SYNTAX_ERROR
);
};
ASTPtr
clone
()
const
override
{
...
...
@@ -46,19 +64,33 @@ protected:
{
if
(
table
.
empty
()
&&
!
database
.
empty
())
{
settings
.
ostr
<<
(
settings
.
hilite
?
hilite_keyword
:
""
)
<<
(
detach
?
"DETACH DATABASE "
:
"DROP DATABASE "
)
<<
(
if_exists
?
"IF EXISTS "
:
""
)
<<
(
settings
.
hilite
?
hilite_none
:
""
)
<<
backQuoteIfNeed
(
database
);
settings
.
ostr
<<
(
settings
.
hilite
?
hilite_keyword
:
""
);
if
(
kind
==
ASTDropQuery
::
Kind
::
Drop
)
settings
.
ostr
<<
"DROP DATABASE "
;
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Detach
)
settings
.
ostr
<<
"DETACH DATABASE "
;
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Truncate
)
settings
.
ostr
<<
"TRUNCATE DATABASE "
;
else
throw
Exception
(
"Not supported kind of drop query."
,
ErrorCodes
::
SYNTAX_ERROR
);
settings
.
ostr
<<
(
if_exists
?
"IF EXISTS "
:
""
)
<<
(
settings
.
hilite
?
hilite_none
:
""
)
<<
backQuoteIfNeed
(
database
);
formatOnCluster
(
settings
);
}
else
{
settings
.
ostr
<<
(
settings
.
hilite
?
hilite_keyword
:
""
)
<<
(
detach
?
"DETACH TABLE "
:
"DROP TABLE "
)
<<
(
if_exists
?
"IF EXISTS "
:
""
)
<<
(
settings
.
hilite
?
hilite_none
:
""
)
<<
(
!
database
.
empty
()
?
backQuoteIfNeed
(
database
)
+
"."
:
""
)
<<
backQuoteIfNeed
(
table
);
settings
.
ostr
<<
(
settings
.
hilite
?
hilite_keyword
:
""
);
if
(
kind
==
ASTDropQuery
::
Kind
::
Drop
)
settings
.
ostr
<<
"DROP TABLE "
;
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Detach
)
settings
.
ostr
<<
"DETACH TABLE "
;
else
if
(
kind
==
ASTDropQuery
::
Kind
::
Truncate
)
settings
.
ostr
<<
"TRUNCATE TABLE "
;
else
throw
Exception
(
"Not supported kind of drop query."
,
ErrorCodes
::
SYNTAX_ERROR
);
settings
.
ostr
<<
(
if_exists
?
"IF EXISTS "
:
""
)
<<
(
settings
.
hilite
?
hilite_none
:
""
)
<<
(
!
database
.
empty
()
?
backQuoteIfNeed
(
database
)
+
"."
:
""
)
<<
backQuoteIfNeed
(
table
);
formatOnCluster
(
settings
);
}
}
...
...
dbms/src/Parsers/ParserDropQuery.cpp
浏览文件 @
6613e567
...
...
@@ -15,6 +15,42 @@ bool ParserDropQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword
s_drop
(
"DROP"
);
ParserKeyword
s_detach
(
"DETACH"
);
ParserKeyword
s_truncate
(
"TRUNCATE"
);
if
(
s_drop
.
ignore
(
pos
,
expected
))
return
parseDropQuery
(
pos
,
node
,
expected
);
else
if
(
s_detach
.
ignore
(
pos
,
expected
))
return
parseDetachQuery
(
pos
,
node
,
expected
);
else
if
(
s_truncate
.
ignore
(
pos
,
expected
))
return
parseTruncateQuery
(
pos
,
node
,
expected
);
else
return
false
;
}
bool
ParserDropQuery
::
parseDetachQuery
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
)
{
if
(
parseDropQuery
(
pos
,
node
,
expected
))
{
ASTDropQuery
*
drop_query
=
static_cast
<
ASTDropQuery
*>
(
node
.
get
());
drop_query
->
kind
=
ASTDropQuery
::
Kind
::
Detach
;
return
true
;
}
return
false
;
}
bool
ParserDropQuery
::
parseTruncateQuery
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
)
{
if
(
parseDropQuery
(
pos
,
node
,
expected
))
{
ASTDropQuery
*
drop_query
=
static_cast
<
ASTDropQuery
*>
(
node
.
get
());
drop_query
->
kind
=
ASTDropQuery
::
Kind
::
Truncate
;
return
true
;
}
return
false
;
}
bool
ParserDropQuery
::
parseDropQuery
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
)
{
ParserKeyword
s_temporary
(
"TEMPORARY"
);
ParserKeyword
s_table
(
"TABLE"
);
ParserKeyword
s_database
(
"DATABASE"
);
...
...
@@ -25,18 +61,9 @@ bool ParserDropQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr
database
;
ASTPtr
table
;
String
cluster_str
;
bool
detach
=
false
;
bool
if_exists
=
false
;
bool
temporary
=
false
;
if
(
!
s_drop
.
ignore
(
pos
,
expected
))
{
if
(
s_detach
.
ignore
(
pos
,
expected
))
detach
=
true
;
else
return
false
;
}
if
(
s_database
.
ignore
(
pos
,
expected
))
{
if
(
s_if_exists
.
ignore
(
pos
,
expected
))
...
...
@@ -82,7 +109,7 @@ bool ParserDropQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto
query
=
std
::
make_shared
<
ASTDropQuery
>
();
node
=
query
;
query
->
detach
=
detach
;
query
->
kind
=
ASTDropQuery
::
Kind
::
Drop
;
query
->
if_exists
=
if_exists
;
query
->
temporary
=
temporary
;
if
(
database
)
...
...
@@ -94,5 +121,4 @@ bool ParserDropQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return
true
;
}
}
dbms/src/Parsers/ParserDropQuery.h
浏览文件 @
6613e567
...
...
@@ -8,7 +8,7 @@ namespace DB
{
/** Query like this:
* DROP|DETACH TABLE [IF EXISTS] [db.]name
* DROP|DETACH
|TRUNCATE
TABLE [IF EXISTS] [db.]name
*
* Or:
* DROP DATABASE [IF EXISTS] db
...
...
@@ -18,6 +18,10 @@ class ParserDropQuery : public IParserBase
protected:
const
char
*
getName
()
const
{
return
"DROP query"
;
}
bool
parseImpl
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
);
bool
parseDropQuery
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
);
bool
parseDetachQuery
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
);
bool
parseTruncateQuery
(
Pos
&
pos
,
ASTPtr
&
node
,
Expected
&
expected
);
};
}
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
浏览文件 @
6613e567
...
...
@@ -105,12 +105,31 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDi
StorageDistributedDirectoryMonitor
::~
StorageDistributedDirectoryMonitor
()
{
if
(
!
quit
)
{
quit
=
true
;
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
{
quit
=
true
;
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
}
cond
.
notify_one
();
thread
.
join
();
}
cond
.
notify_one
();
thread
.
join
();
}
void
StorageDistributedDirectoryMonitor
::
shutdownAndDropAllData
()
{
if
(
!
quit
)
{
{
quit
=
true
;
std
::
lock_guard
<
std
::
mutex
>
lock
{
mutex
};
}
cond
.
notify_one
();
thread
.
join
();
}
Poco
::
File
(
path
).
remove
(
true
);
}
...
...
@@ -210,7 +229,6 @@ bool StorageDistributedDirectoryMonitor::findFiles()
return
true
;
}
void
StorageDistributedDirectoryMonitor
::
processFile
(
const
std
::
string
&
file_path
)
{
LOG_TRACE
(
log
,
"Started processing `"
<<
file_path
<<
'`'
);
...
...
@@ -407,6 +425,7 @@ struct StorageDistributedDirectoryMonitor::Batch
}
};
void
StorageDistributedDirectoryMonitor
::
processFilesWithBatching
(
const
std
::
map
<
UInt64
,
std
::
string
>
&
files
)
{
std
::
unordered_set
<
UInt64
>
file_indices_to_skip
;
...
...
@@ -489,7 +508,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
Poco
::
File
{
current_batch_file_path
}.
remove
();
}
bool
StorageDistributedDirectoryMonitor
::
isFileBrokenErrorCode
(
int
code
)
{
return
code
==
ErrorCodes
::
CHECKSUM_DOESNT_MATCH
...
...
@@ -524,7 +542,6 @@ bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & f
return
false
;
}
std
::
string
StorageDistributedDirectoryMonitor
::
getLoggerName
()
const
{
return
storage
.
table_name
+
'.'
+
storage
.
getName
()
+
".DirectoryMonitor"
;
...
...
dbms/src/Storages/Distributed/DirectoryMonitor.h
浏览文件 @
6613e567
...
...
@@ -22,6 +22,7 @@ public:
static
ConnectionPoolPtr
createPool
(
const
std
::
string
&
name
,
const
StorageDistributed
&
storage
);
void
shutdownAndDropAllData
();
private:
void
run
();
bool
findFiles
();
...
...
dbms/src/Storages/IStorage.h
浏览文件 @
6613e567
...
...
@@ -200,6 +200,14 @@ public:
*/
virtual
void
drop
()
{}
/** Delete the table data. Called before deleting the directory with the data.
* If you do not need any action other than deleting the directory with data, you can leave this method blank.
*/
virtual
void
truncate
(
const
ASTPtr
&
/*query*/
)
{
throw
Exception
(
"Truncate is not supported by storage "
+
getName
(),
ErrorCodes
::
NOT_IMPLEMENTED
);
}
/** Rename the table.
* Renaming a name in a file with metadata, the name in the list of tables in the RAM, is done separately.
* In this function, you need to rename the directory with the data, if any.
...
...
dbms/src/Storages/MergeTree/MergeTreeData.cpp
浏览文件 @
6613e567
...
...
@@ -797,7 +797,16 @@ void MergeTreeData::dropAllData()
LOG_TRACE
(
log
,
"dropAllData: removing data from filesystem."
);
Poco
::
File
(
full_path
).
remove
(
true
);
std
::
vector
<
Poco
::
File
>
data_dirs
;
Poco
::
File
(
full_path
).
list
(
data_dirs
);
auto
detached_file
=
Poco
::
Path
(
full_path
+
"/detached"
).
makeAbsolute
().
toString
();
for
(
auto
&
data_dir
:
data_dirs
)
{
if
(
Poco
::
Path
(
data_dir
.
path
()).
makeAbsolute
().
toString
()
!=
detached_file
)
data_dir
.
remove
(
true
);
}
LOG_TRACE
(
log
,
"dropAllData: done."
);
}
...
...
@@ -1571,6 +1580,7 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
part
->
renameAddPrefix
(
move_to_detached
,
prefix
);
data_parts_indexes
.
erase
(
it_part
);
std
::
cout
<<
"MergeTreeData::renameAndDetachPart
\n
"
;
if
(
restore_covered
&&
part
->
info
.
level
==
0
)
{
LOG_WARNING
(
log
,
"Will not recover parts covered by zero-level part "
<<
part
->
name
);
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
浏览文件 @
6613e567
...
...
@@ -157,7 +157,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
auto
first_outdated_block_time_threshold
=
std
::
upper_bound
(
timed_blocks
.
begin
(),
timed_blocks
.
end
(),
block_threshold
,
NodeWithStat
::
greaterByTime
);
auto
first_outdated_block
=
std
::
min
(
first_outdated_block_fixed_threshold
,
first_outdated_block_time_threshold
);
std
::
vector
<
std
::
pair
<
String
,
std
::
future
<
zkutil
::
RemoveResponse
>>
>
try_remove_futures
;
zkutil
::
MultiAsyncResponse
<
zkutil
::
RemoveResponse
>
try_remove_futures
;
for
(
auto
it
=
first_outdated_block
;
it
!=
timed_blocks
.
end
();
++
it
)
{
String
path
=
storage
.
zookeeper_path
+
"/blocks/"
+
it
->
node
;
...
...
@@ -212,7 +212,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
<<
" to clear old ones from ZooKeeper."
);
}
std
::
vector
<
std
::
pair
<
String
,
std
::
future
<
zkutil
::
ExistsResponse
>>
>
exists_futures
;
zkutil
::
MultiAsyncResponse
<
zkutil
::
ExistsResponse
>
exists_futures
;
for
(
const
String
&
block
:
blocks
)
{
auto
it
=
cached_block_stats
.
find
(
block
);
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
浏览文件 @
6613e567
...
...
@@ -53,7 +53,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
std
::
sort
(
children
.
begin
(),
children
.
end
());
std
::
vector
<
std
::
pair
<
String
,
std
::
future
<
zkutil
::
GetResponse
>>
>
futures
;
zkutil
::
MultiAsyncResponse
<
zkutil
::
GetResponse
>
futures
;
futures
.
reserve
(
children
.
size
());
for
(
const
String
&
child
:
children
)
...
...
@@ -315,7 +315,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
LOG_DEBUG
(
log
,
"Pulling "
<<
(
end
-
begin
)
<<
" entries to queue: "
<<
*
begin
<<
" - "
<<
*
last
);
std
::
vector
<
std
::
pair
<
String
,
std
::
future
<
zkutil
::
GetResponse
>>
>
futures
;
zkutil
::
MultiAsyncResponse
<
zkutil
::
GetResponse
>
futures
;
futures
.
reserve
(
end
-
begin
);
for
(
auto
it
=
begin
;
it
!=
end
;
++
it
)
...
...
dbms/src/Storages/StorageDistributed.cpp
浏览文件 @
6613e567
...
...
@@ -40,6 +40,8 @@
#include <boost/filesystem.hpp>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Interpreters/ClusterProxy/TruncateStreamFactory.h>
namespace
DB
...
...
@@ -296,6 +298,30 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
describe_stream_factory
,
cluster
,
describe_query
,
context
,
settings
);
}
void
StorageDistributed
::
truncate
(
const
ASTPtr
&
query
)
{
ClusterPtr
cluster
=
getCluster
();
ASTPtr
ast_drop_query
=
query
->
clone
();
ASTDropQuery
&
drop_query
=
typeid_cast
<
ASTDropQuery
&>
(
*
ast_drop_query
);
drop_query
.
table
=
remote_table
;
drop_query
.
database
=
remote_database
;
{
std
::
lock_guard
lock
(
cluster_nodes_mutex
);
for
(
auto
it
=
cluster_nodes_data
.
begin
();
it
!=
cluster_nodes_data
.
end
();)
{
it
->
second
.
shutdownAndDropAllData
();
it
=
cluster_nodes_data
.
erase
(
it
);
}
}
String
storage_path
=
getDataPath
();
ClusterProxy
::
TruncateStreamFactory
truncate_stream_factory
(
cluster
,
storage_path
);
ClusterProxy
::
executeQuery
(
truncate_stream_factory
,
cluster
,
ast_drop_query
,
context
,
context
.
getSettingsRef
());
}
NameAndTypePair
StorageDistributed
::
getColumn
(
const
String
&
column_name
)
const
{
...
...
@@ -364,6 +390,11 @@ void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(const std::str
directory_monitor
=
std
::
make_unique
<
StorageDistributedDirectoryMonitor
>
(
storage
,
name
,
conneciton_pool
);
}
void
StorageDistributed
::
ClusterNodeData
::
shutdownAndDropAllData
()
{
directory_monitor
->
shutdownAndDropAllData
();
}
void
registerStorageDistributed
(
StorageFactory
&
factory
)
{
...
...
dbms/src/Storages/StorageDistributed.h
浏览文件 @
6613e567
...
...
@@ -63,6 +63,9 @@ public:
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Settings
&
settings
)
override
;
void
drop
()
override
{}
void
truncate
(
const
ASTPtr
&
query
)
override
;
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
/*new_database_name*/
,
const
String
&
new_table_name
)
override
{
table_name
=
new_table_name
;
}
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
...
...
@@ -121,6 +124,8 @@ public:
void
requireConnectionPool
(
const
std
::
string
&
name
,
const
StorageDistributed
&
storage
);
/// Creates directory_monitor if not exists.
void
requireDirectoryMonitor
(
const
std
::
string
&
name
,
StorageDistributed
&
storage
);
void
shutdownAndDropAllData
();
};
std
::
unordered_map
<
std
::
string
,
ClusterNodeData
>
cluster_nodes_data
;
std
::
mutex
cluster_nodes_mutex
;
...
...
dbms/src/Storages/StorageLog.cpp
浏览文件 @
6613e567
...
...
@@ -102,6 +102,7 @@ private:
FileStreams
streams
;
void
readData
(
const
String
&
name
,
const
IDataType
&
type
,
IColumn
&
column
,
size_t
max_rows_to_read
);
};
...
...
@@ -462,6 +463,29 @@ void StorageLog::rename(const String & new_path_to_db, const String & /*new_data
marks_file
=
Poco
::
File
(
path
+
escapeForFileName
(
name
)
+
'/'
+
DBMS_STORAGE_LOG_MARKS_FILE_NAME
);
}
void
StorageLog
::
truncate
(
const
ASTPtr
&
/*query*/
)
{
std
::
shared_lock
<
std
::
shared_mutex
>
lock
(
rwlock
);
String
table_dir
=
path
+
escapeForFileName
(
name
);
this
->
files
.
clear
();
this
->
file_count
=
0
;
this
->
loaded_marks
=
false
;
std
::
vector
<
Poco
::
File
>
data_files
;
Poco
::
File
(
table_dir
).
list
(
data_files
);
for
(
auto
&
file
:
data_files
)
file
.
remove
(
false
);
for
(
const
auto
&
column
:
getColumns
().
getAllPhysical
())
addFiles
(
column
.
name
,
*
column
.
type
);
this
->
file_checker
=
FileChecker
{
table_dir
+
"/"
+
"sizes.json"
};
this
->
marks_file
=
Poco
::
File
(
table_dir
+
"/"
+
DBMS_STORAGE_LOG_MARKS_FILE_NAME
);
}
const
StorageLog
::
Marks
&
StorageLog
::
getMarksWithRealRowCount
()
const
{
...
...
@@ -486,7 +510,6 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
return
it
->
second
.
marks
;
}
BlockInputStreams
StorageLog
::
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
/*query_info*/
,
...
...
@@ -533,7 +556,6 @@ BlockInputStreams StorageLog::read(
return
res
;
}
BlockOutputStreamPtr
StorageLog
::
write
(
const
ASTPtr
&
/*query*/
,
const
Settings
&
/*settings*/
)
{
...
...
dbms/src/Storages/StorageLog.h
浏览文件 @
6613e567
...
...
@@ -40,6 +40,8 @@ public:
bool
checkData
()
const
override
;
void
truncate
(
const
ASTPtr
&
query
)
override
;
std
::
string
full_path
()
const
{
return
path
+
escapeForFileName
(
name
)
+
'/'
;}
String
getDataPath
()
const
override
{
return
full_path
();
}
...
...
dbms/src/Storages/StorageMaterializedView.cpp
浏览文件 @
6613e567
...
...
@@ -198,12 +198,27 @@ void StorageMaterializedView::drop()
auto
drop_query
=
std
::
make_shared
<
ASTDropQuery
>
();
drop_query
->
database
=
target_database_name
;
drop_query
->
table
=
target_table_name
;
drop_query
->
kind
=
ASTDropQuery
::
Kind
::
Drop
;
ASTPtr
ast_drop_query
=
drop_query
;
InterpreterDropQuery
drop_interpreter
(
ast_drop_query
,
global_context
);
drop_interpreter
.
execute
();
}
}
void
StorageMaterializedView
::
truncate
(
const
ASTPtr
&
query
)
{
if
(
has_inner_table
&&
global_context
.
tryGetTable
(
target_database_name
,
target_table_name
))
{
ASTPtr
ast_drop_query
=
query
->
clone
();
ASTDropQuery
&
drop_query
=
typeid_cast
<
ASTDropQuery
&>
(
*
ast_drop_query
);
drop_query
.
database
=
target_database_name
;
drop_query
.
table
=
target_table_name
;
InterpreterDropQuery
drop_interpreter
(
ast_drop_query
,
global_context
);
drop_interpreter
.
execute
();
}
}
void
StorageMaterializedView
::
checkStatementCanBeForwarded
()
const
{
if
(
!
has_inner_table
)
...
...
dbms/src/Storages/StorageMaterializedView.h
浏览文件 @
6613e567
...
...
@@ -31,6 +31,8 @@ public:
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Settings
&
settings
)
override
;
void
drop
()
override
;
void
truncate
(
const
ASTPtr
&
query
)
override
;
bool
optimize
(
const
ASTPtr
&
query
,
const
ASTPtr
&
partition
,
bool
final
,
bool
deduplicate
,
const
Context
&
context
)
override
;
void
dropPartition
(
const
ASTPtr
&
query
,
const
ASTPtr
&
partition
,
bool
detach
,
const
Context
&
context
)
override
;
...
...
dbms/src/Storages/StorageMemory.cpp
浏览文件 @
6613e567
...
...
@@ -128,6 +128,12 @@ void StorageMemory::drop()
data
.
clear
();
}
void
StorageMemory
::
truncate
(
const
ASTPtr
&
/*query*/
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex
);
data
.
clear
();
}
void
registerStorageMemory
(
StorageFactory
&
factory
)
{
...
...
dbms/src/Storages/StorageMemory.h
浏览文件 @
6613e567
...
...
@@ -39,6 +39,9 @@ public:
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Settings
&
settings
)
override
;
void
drop
()
override
;
virtual
void
truncate
(
const
ASTPtr
&
query
)
override
;
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
/*new_database_name*/
,
const
String
&
new_table_name
)
override
{
table_name
=
new_table_name
;
}
private:
...
...
dbms/src/Storages/StorageMergeTree.cpp
浏览文件 @
6613e567
...
...
@@ -124,6 +124,17 @@ void StorageMergeTree::drop()
{
shutdown
();
data
.
dropAllData
();
Poco
::
File
(
full_path
).
remove
(
true
);
}
void
StorageMergeTree
::
truncate
(
const
ASTPtr
&
/*query*/
)
{
merger
.
merges_blocker
.
cancelForever
();
data
.
dropAllData
();
/// reset block id
increment
.
set
(
0
);
data
.
insert_increment
.
set
(
0
);
}
void
StorageMergeTree
::
rename
(
const
String
&
new_path_to_db
,
const
String
&
/*new_database_name*/
,
const
String
&
new_table_name
)
...
...
@@ -139,6 +150,7 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne
/// NOTE: Logger names are not updated.
}
void
StorageMergeTree
::
alter
(
const
AlterCommands
&
params
,
const
String
&
database_name
,
...
...
@@ -257,7 +269,6 @@ struct CurrentlyMergingPartsTagger
}
};
bool
StorageMergeTree
::
merge
(
size_t
aio_threshold
,
bool
aggressive
,
...
...
@@ -374,6 +385,7 @@ bool StorageMergeTree::merge(
return
true
;
}
bool
StorageMergeTree
::
mergeTask
()
{
if
(
shutdown_called
)
...
...
@@ -545,7 +557,6 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool part, cons
context
.
dropCaches
();
}
void
StorageMergeTree
::
freezePartition
(
const
ASTPtr
&
partition
,
const
String
&
with_name
,
const
Context
&
context
)
{
data
.
freezePartition
(
partition
,
with_name
,
context
);
...
...
dbms/src/Storages/StorageMergeTree.h
浏览文件 @
6613e567
...
...
@@ -73,6 +73,8 @@ public:
void
drop
()
override
;
void
truncate
(
const
ASTPtr
&
/*query*/
)
override
;
void
rename
(
const
String
&
new_path_to_db
,
const
String
&
new_database_name
,
const
String
&
new_table_name
)
override
;
void
alter
(
const
AlterCommands
&
params
,
const
String
&
database_name
,
const
String
&
table_name
,
const
Context
&
context
)
override
;
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
6613e567
...
...
@@ -2856,50 +2856,44 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
String
partition_id
=
data
.
getPartitionIDFromQuery
(
partition
,
context
);
Int64
min_block
=
0
;
Int64
max_block
=
0
;
String
fake_part_name
=
getFakePartNameCoveringAllPartsInPartition
(
partition_id
,
&
min_block
,
&
max_block
);
LogEntry
entry
;
if
(
fake_part_name
.
empty
(
))
if
(
dropBlocksInPartition
(
*
zookeeper
,
partition_id
,
entry
,
detach
))
{
LOG_INFO
(
log
,
"Will not drop partition "
<<
partition_id
<<
", it is empty."
);
return
;
/// If necessary, wait until the operation is performed on itself or on all replicas.
if
(
context
.
getSettingsRef
().
replication_alter_partitions_sync
!=
0
)
{
if
(
context
.
getSettingsRef
().
replication_alter_partitions_sync
==
1
)
waitForReplicaToProcessLogEntry
(
replica_name
,
entry
);
else
waitForAllReplicasToProcessLogEntry
(
entry
);
}
}
}
clearBlocksInPartition
(
*
zookeeper
,
partition_id
,
min_block
,
max_block
);
void
StorageReplicatedMergeTree
::
truncate
(
const
ASTPtr
&
query
)
{
assertNotReadonly
();
/** Forbid to choose the parts to be deleted for merging.
* Invariant: after the `DROP_RANGE` entry appears in the log, merge of deleted parts will not appear in the log.
*/
zkutil
::
ZooKeeperPtr
zookeeper
=
getZooKeeper
();
if
(
!
is_leader
)
{
s
td
::
lock_guard
<
std
::
mutex
>
merge_selecting_lock
(
merge_selecting_mutex
);
queue
.
disableMergesInRange
(
fake_part_name
)
;
s
endRequestToLeaderReplica
(
query
,
context
.
getSettingsRef
()
);
return
;
}
LOG_DEBUG
(
log
,
"Disabled merges covered by range "
<<
fake_part_name
);
Strings
partitions
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/block_numbers"
);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
LogEntry
entry
;
entry
.
type
=
LogEntry
::
DROP_RANGE
;
entry
.
source_replica
=
replica_name
;
entry
.
new_part_name
=
fake_part_name
;
entry
.
detach
=
detach
;
entry
.
create_time
=
time
(
nullptr
);
String
log_znode_path
=
zookeeper
->
create
(
zookeeper_path
+
"/log/log-"
,
entry
.
toString
(),
zkutil
::
CreateMode
::
PersistentSequential
);
entry
.
znode_name
=
log_znode_path
.
substr
(
log_znode_path
.
find_last_of
(
'/'
)
+
1
);
/// If necessary, wait until the operation is performed on itself or on all replicas.
if
(
context
.
getSettingsRef
().
replication_alter_partitions_sync
!=
0
)
for
(
String
&
partition_id
:
partitions
)
{
if
(
context
.
getSettingsRef
().
replication_alter_partitions_sync
==
1
)
waitForReplicaToProcessLogEntry
(
replica_name
,
entry
);
else
LogEntry
entry
;
if
(
dropBlocksInPartition
(
*
zookeeper
,
partition_id
,
entry
,
false
))
waitForAllReplicasToProcessLogEntry
(
entry
);
}
}
void
StorageReplicatedMergeTree
::
attachPartition
(
const
ASTPtr
&
partition
,
bool
attach_part
,
const
Context
&
context
)
{
assertNotReadonly
();
...
...
@@ -3742,7 +3736,7 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
throw
Exception
(
zookeeper_path
+
"/blocks doesn't exist"
,
ErrorCodes
::
NOT_FOUND_NODE
);
String
partition_prefix
=
partition_id
+
"_"
;
std
::
vector
<
std
::
pair
<
String
,
std
::
future
<
zkutil
::
GetResponse
>>
>
get_futures
;
zkutil
::
MultiAsyncResponse
<
zkutil
::
GetResponse
>
get_futures
;
for
(
const
String
&
block_id
:
blocks
)
{
if
(
startsWith
(
block_id
,
partition_prefix
))
...
...
@@ -3752,7 +3746,7 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
}
}
std
::
vector
<
std
::
pair
<
String
,
std
::
future
<
zkutil
::
RemoveResponse
>>
>
to_delete_futures
;
zkutil
::
MultiAsyncResponse
<
zkutil
::
RemoveResponse
>
to_delete_futures
;
for
(
auto
&
pair
:
get_futures
)
{
const
String
&
path
=
pair
.
first
;
...
...
@@ -3808,14 +3802,18 @@ void ReplicatedMergeTreeMergeSelectingThread::clearState()
{
deduplicate
=
false
;
/// TODO: read deduplicate option from table config
uncached_merging_predicate
=
[
this
](
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
)
uncached_merging_predicate
=
[
this
](
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
)
{
return
canMergePartsAccordingToZooKeeperInfo
(
left
,
right
,
storage
->
getZooKeeper
(),
storage
->
zookeeper_path
,
storage
->
data
);
return
canMergePartsAccordingToZooKeeperInfo
(
left
,
right
,
storage
->
getZooKeeper
(),
storage
->
zookeeper_path
,
storage
->
data
);
};
merging_predicate_args_to_key
=
[](
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
)
merging_predicate_args_to_key
=
[](
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
)
{
return
std
::
make_pair
(
left
->
name
,
right
->
name
);
return
std
::
make_pair
(
left
->
name
,
right
->
name
);
};
cached_merging_predicate
.
reset
(
new
CachedMergingPredicate
<
std
::
pair
<
std
::
string
,
std
::
string
>>
());
...
...
@@ -3824,11 +3822,49 @@ void ReplicatedMergeTreeMergeSelectingThread::clearState()
now
=
std
::
chrono
::
steady_clock
::
time_point
();
can_merge
=
[
&
]
(
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
,
String
*
)
can_merge
=
[
&
]
(
const
MergeTreeData
::
DataPartPtr
&
left
,
const
MergeTreeData
::
DataPartPtr
&
right
,
String
*
)
{
return
partsWillNotBeMergedOrDisabled
(
left
,
right
,
storage
->
queue
)
&&
cached_merging_predicate
->
get
(
now
,
uncached_merging_predicate
,
merging_predicate_args_to_key
,
left
,
right
);
return
partsWillNotBeMergedOrDisabled
(
left
,
right
,
storage
->
queue
)
&&
cached_merging_predicate
->
get
(
now
,
uncached_merging_predicate
,
merging_predicate_args_to_key
,
left
,
right
);
};
}
bool
StorageReplicatedMergeTree
::
dropBlocksInPartition
(
zkutil
::
ZooKeeper
&
zookeeper
,
String
&
partition_id
,
StorageReplicatedMergeTree
::
LogEntry
&
entry
,
bool
detach
)
{
Int64
min_block
=
0
;
Int64
max_block
=
0
;
String
fake_part_name
=
getFakePartNameCoveringAllPartsInPartition
(
partition_id
,
&
min_block
,
&
max_block
);
if
(
fake_part_name
.
empty
())
{
LOG_INFO
(
log
,
"Will not drop partition "
<<
partition_id
<<
", it is empty."
);
return
false
;
}
clearBlocksInPartition
(
zookeeper
,
partition_id
,
min_block
,
max_block
);
/** Forbid to choose the parts to be deleted for merging.
* Invariant: after the `DROP_RANGE` entry appears in the log, merge of deleted parts will not appear in the log.
*/
{
std
::
lock_guard
<
std
::
mutex
>
merge_selecting_lock
(
merge_selecting_mutex
);
queue
.
disableMergesInRange
(
fake_part_name
);
}
LOG_DEBUG
(
log
,
"Disabled merges covered by range "
<<
fake_part_name
);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
entry
.
type
=
LogEntry
::
DROP_RANGE
;
entry
.
source_replica
=
replica_name
;
entry
.
new_part_name
=
fake_part_name
;
entry
.
detach
=
detach
;
entry
.
create_time
=
time
(
nullptr
);
String
log_znode_path
=
zookeeper
.
create
(
zookeeper_path
+
"/log/log-"
,
entry
.
toString
(),
zkutil
::
CreateMode
::
PersistentSequential
);
entry
.
znode_name
=
log_znode_path
.
substr
(
log_znode_path
.
find_last_of
(
'/'
)
+
1
);
return
true
;
}
}
dbms/src/Storages/StorageReplicatedMergeTree.h
浏览文件 @
6613e567
...
...
@@ -125,6 +125,8 @@ public:
*/
void
drop
()
override
;
void
truncate
(
const
ASTPtr
&
query
)
override
;
void
rename
(
const
String
&
new_path_to_db
,
const
String
&
new_database_name
,
const
String
&
new_table_name
)
override
;
bool
supportsIndexForIn
()
const
override
{
return
true
;
}
...
...
@@ -457,6 +459,9 @@ private:
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress
getReplicatedMergeTreeAddress
()
const
;
bool
dropBlocksInPartition
(
zkutil
::
ZooKeeper
&
zookeeper
,
String
&
partition_id
,
StorageReplicatedMergeTree
::
LogEntry
&
entry
,
bool
detach
);
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
...
...
@@ -476,6 +481,7 @@ protected:
const
MergeTreeData
::
MergingParams
&
merging_params_
,
const
MergeTreeSettings
&
settings_
,
bool
has_force_restore_data_flag
);
};
...
...
dbms/src/Storages/StorageSet.cpp
浏览文件 @
6613e567
...
...
@@ -116,7 +116,23 @@ StorageSet::StorageSet(
void
StorageSet
::
insertBlock
(
const
Block
&
block
)
{
set
->
insertFromBlock
(
block
,
/*fill_set_elements=*/
false
);
}
size_t
StorageSet
::
getSize
()
const
{
return
set
->
getTotalRowCount
();
};
size_t
StorageSet
::
getSize
()
const
{
return
set
->
getTotalRowCount
();
}
void
StorageSet
::
truncate
(
const
ASTPtr
&
/*query*/
)
{
increment
=
0
;
set
=
std
::
make_shared
<
Set
>
(
SizeLimits
());
static
const
auto
file_suffix
=
".bin"
;
Poco
::
DirectoryIterator
dir_end
;
for
(
Poco
::
DirectoryIterator
dir_it
(
path
);
dir_end
!=
dir_it
;
++
dir_it
)
{
const
auto
&
name
=
dir_it
.
name
();
if
(
dir_it
->
isFile
()
&&
endsWith
(
name
,
file_suffix
)
&&
dir_it
->
getSize
()
>
0
)
dir_it
->
remove
(
false
);
}
};
void
StorageSetOrJoinBase
::
restore
()
...
...
dbms/src/Storages/StorageSet.h
浏览文件 @
6613e567
...
...
@@ -65,6 +65,8 @@ public:
/// Access the insides.
SetPtr
&
getSet
()
{
return
set
;
}
void
truncate
(
const
ASTPtr
&
query
)
override
;
private:
SetPtr
set
;
...
...
dbms/src/Storages/StorageStripeLog.cpp
浏览文件 @
6613e567
...
...
@@ -30,6 +30,7 @@
#include <Storages/StorageStripeLog.h>
#include <Storages/StorageFactory.h>
#include <Poco/DirectoryIterator.h>
namespace
DB
...
...
@@ -287,6 +288,19 @@ bool StorageStripeLog::checkData() const
return
file_checker
.
check
();
}
void
StorageStripeLog
::
truncate
(
const
ASTPtr
&
/*query*/
)
{
std
::
shared_lock
<
std
::
shared_mutex
>
lock
(
rwlock
);
String
table_dir
=
path
+
escapeForFileName
(
name
);
Poco
::
DirectoryIterator
dir_end
;
for
(
auto
dir_it
=
Poco
::
DirectoryIterator
(
table_dir
);
dir_it
!=
dir_end
;
++
dir_it
)
dir_it
->
remove
(
false
);
this
->
file_checker
=
FileChecker
{
table_dir
+
"/"
+
"sizes.json"
};
}
void
registerStorageStripeLog
(
StorageFactory
&
factory
)
{
...
...
dbms/src/Storages/StorageStripeLog.h
浏览文件 @
6613e567
...
...
@@ -53,6 +53,8 @@ public:
String
getDataPath
()
const
override
{
return
full_path
();
}
void
truncate
(
const
ASTPtr
&
query
)
override
;
private:
String
path
;
String
name
;
...
...
dbms/src/Storages/StorageTinyLog.cpp
浏览文件 @
6613e567
...
...
@@ -365,6 +365,21 @@ bool StorageTinyLog::checkData() const
return
file_checker
.
check
();
}
void
StorageTinyLog
::
truncate
(
const
ASTPtr
&
/*query*/
)
{
String
table_dir
=
path
+
escapeForFileName
(
name
);
Poco
::
DirectoryIterator
dir_end
;
for
(
auto
dir_it
=
Poco
::
DirectoryIterator
(
table_dir
);
dir_it
!=
dir_end
;
++
dir_it
)
dir_it
->
remove
(
false
);
this
->
files
.
clear
();
this
->
file_checker
=
FileChecker
{
table_dir
+
"/"
+
"sizes.json"
};
for
(
const
auto
&
column
:
getColumns
().
getAllPhysical
())
addFiles
(
column
.
name
,
*
column
.
type
);
}
void
registerStorageTinyLog
(
StorageFactory
&
factory
)
{
...
...
dbms/src/Storages/StorageTinyLog.h
浏览文件 @
6613e567
...
...
@@ -52,6 +52,8 @@ public:
String
getDataPath
()
const
override
{
return
full_path
();
}
void
truncate
(
const
ASTPtr
&
query
)
override
;
private:
String
path
;
String
name
;
...
...
dbms/tests/queries/0_stateless/00623_replicated_truncate_table.reference
0 → 100644
浏览文件 @
6613e567
======Before Truncate======
2015-01-01 10 42
2015-01-01 10 42
======After Truncate And Empty======
======After Truncate And Insert Data======
2015-01-01 10 42
2015-01-01 10 42
dbms/tests/queries/0_stateless/00623_replicated_truncate_table.sql
0 → 100644
浏览文件 @
6613e567
DROP
TABLE
IF
EXISTS
test
.
replicated_truncate1
;
DROP
TABLE
IF
EXISTS
test
.
replicated_truncate2
;
CREATE
TABLE
test
.
replicated_truncate1
(
d
Date
,
k
UInt64
,
i32
Int32
)
ENGINE
=
ReplicatedMergeTree
(
'/clickhouse/tables/test/truncate'
,
'r1'
,
d
,
k
,
8192
);
CREATE
TABLE
test
.
replicated_truncate2
(
d
Date
,
k
UInt64
,
i32
Int32
)
ENGINE
=
ReplicatedMergeTree
(
'/clickhouse/tables/test/truncate'
,
'r2'
,
d
,
k
,
8192
);
SELECT
'======Before Truncate======'
;
INSERT
INTO
test
.
replicated_truncate1
VALUES
(
'2015-01-01'
,
10
,
42
);
SELECT
*
FROM
test
.
replicated_truncate1
ORDER
BY
k
;
SELECT
*
FROM
test
.
replicated_truncate2
ORDER
BY
k
;
SELECT
'======After Truncate And Empty======'
;
TRUNCATE
TABLE
test
.
replicated_truncate1
;
SELECT
*
FROM
test
.
replicated_truncate1
ORDER
BY
k
;
SELECT
*
FROM
test
.
replicated_truncate2
ORDER
BY
k
;
SELECT
'======After Truncate And Insert Data======'
;
INSERT
INTO
test
.
replicated_truncate1
VALUES
(
'2015-01-01'
,
10
,
42
);
SELECT
*
FROM
test
.
replicated_truncate1
ORDER
BY
k
;
SELECT
*
FROM
test
.
replicated_truncate2
ORDER
BY
k
;
DROP
TABLE
IF
EXISTS
test
.
replicated_truncate1
;
DROP
TABLE
IF
EXISTS
test
.
replicated_truncate2
;
dbms/tests/queries/0_stateless/00623_truncate_table.reference
0 → 100644
浏览文件 @
6613e567
======Before Truncate======
1
1
1
1
1
2000-01-01 1
2000-01-01 1
1 hello
======After Truncate And Empty======
0
======After Truncate And Insert Data======
1
1
1
1
1
2000-01-01 1
2000-01-01 1
1 hello
dbms/tests/queries/0_stateless/00623_truncate_table.sql
0 → 100644
浏览文件 @
6613e567
DROP
DATABASE
IF
EXISTS
truncate_test
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_log
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_memory
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_tiny_log
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_stripe_log
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_merge_tree
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_materialized_view
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_materialized_depend
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_distributed_depend
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_distributed
;
CREATE
DATABASE
truncate_test
;
CREATE
TABLE
truncate_test
.
test_set
(
id
UInt64
)
ENGINE
=
Set
;
CREATE
TABLE
truncate_test
.
test_log
(
id
UInt64
)
ENGINE
=
Log
;
CREATE
TABLE
truncate_test
.
test_memory
(
id
UInt64
)
ENGINE
=
Memory
;
CREATE
TABLE
truncate_test
.
test_tiny_log
(
id
UInt64
)
ENGINE
=
TinyLog
;
CREATE
TABLE
truncate_test
.
test_stripe_log
(
id
UInt64
)
ENGINE
=
StripeLog
;
CREATE
TABLE
truncate_test
.
test_merge_tree
(
p
Date
,
k
UInt64
)
ENGINE
=
MergeTree
(
p
,
k
,
1
);
CREATE
TABLE
truncate_test
.
test_materialized_depend
(
p
Date
,
k
UInt64
)
ENGINE
=
Null
;
CREATE
MATERIALIZED
VIEW
truncate_test
.
test_materialized_view
ENGINE
=
MergeTree
(
p
,
k
,
1
)
AS
SELECT
*
FROM
truncate_test
.
test_materialized_depend
;
CREATE
TABLE
truncate_test
.
test_distributed_depend
(
x
UInt64
,
s
String
)
ENGINE
=
MergeTree
ORDER
BY
x
;
CREATE
TABLE
truncate_test
.
test_distributed
AS
truncate_test
.
test_distributed_depend
ENGINE
=
Distributed
(
test_shard_localhost
,
truncate_test
,
test_distributed_depend
);
SELECT
'======Before Truncate======'
;
INSERT
INTO
truncate_test
.
test_set
VALUES
(
0
);
INSERT
INTO
truncate_test
.
test_log
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_memory
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_tiny_log
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_stripe_log
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_merge_tree
VALUES
(
'2000-01-01'
,
1
);
INSERT
INTO
truncate_test
.
test_materialized_depend
VALUES
(
'2000-01-01'
,
1
);
INSERT
INTO
truncate_test
.
test_distributed_depend
VALUES
(
1
,
'hello'
);
SELECT
*
FROM
system
.
numbers
WHERE
number
NOT
IN
truncate_test
.
test_set
LIMIT
1
;
SELECT
*
FROM
truncate_test
.
test_log
;
SELECT
*
FROM
truncate_test
.
test_memory
;
SELECT
*
FROM
truncate_test
.
test_tiny_log
;
SELECT
*
FROM
truncate_test
.
test_stripe_log
;
SELECT
*
FROM
truncate_test
.
test_merge_tree
;
SELECT
*
FROM
truncate_test
.
test_materialized_view
;
SELECT
*
FROM
truncate_test
.
test_distributed
;
SELECT
'======After Truncate And Empty======'
;
TRUNCATE
TABLE
truncate_test
.
test_set
;
TRUNCATE
TABLE
truncate_test
.
test_log
;
TRUNCATE
TABLE
truncate_test
.
test_memory
;
TRUNCATE
TABLE
truncate_test
.
test_tiny_log
;
TRUNCATE
TABLE
truncate_test
.
test_stripe_log
;
TRUNCATE
TABLE
truncate_test
.
test_merge_tree
;
TRUNCATE
TABLE
truncate_test
.
test_materialized_view
;
TRUNCATE
TABLE
truncate_test
.
test_distributed
;
SELECT
*
FROM
system
.
numbers
WHERE
number
NOT
IN
truncate_test
.
test_set
LIMIT
1
;
SELECT
*
FROM
truncate_test
.
test_log
;
SELECT
*
FROM
truncate_test
.
test_memory
;
SELECT
*
FROM
truncate_test
.
test_tiny_log
;
SELECT
*
FROM
truncate_test
.
test_stripe_log
;
SELECT
*
FROM
truncate_test
.
test_merge_tree
;
SELECT
*
FROM
truncate_test
.
test_materialized_view
;
SELECT
*
FROM
truncate_test
.
test_distributed
;
SELECT
'======After Truncate And Insert Data======'
;
INSERT
INTO
truncate_test
.
test_set
VALUES
(
0
);
INSERT
INTO
truncate_test
.
test_log
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_memory
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_tiny_log
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_stripe_log
VALUES
(
1
);
INSERT
INTO
truncate_test
.
test_merge_tree
VALUES
(
'2000-01-01'
,
1
);
INSERT
INTO
truncate_test
.
test_materialized_depend
VALUES
(
'2000-01-01'
,
1
);
INSERT
INTO
truncate_test
.
test_distributed_depend
VALUES
(
1
,
'hello'
);
SELECT
*
FROM
system
.
numbers
WHERE
number
NOT
IN
truncate_test
.
test_set
LIMIT
1
;
SELECT
*
FROM
truncate_test
.
test_log
;
SELECT
*
FROM
truncate_test
.
test_memory
;
SELECT
*
FROM
truncate_test
.
test_tiny_log
;
SELECT
*
FROM
truncate_test
.
test_stripe_log
;
SELECT
*
FROM
truncate_test
.
test_merge_tree
;
SELECT
*
FROM
truncate_test
.
test_materialized_view
;
SELECT
*
FROM
truncate_test
.
test_distributed
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_set
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_log
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_memory
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_tiny_log
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_stripe_log
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_merge_tree
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_materialized_view
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_materialized_depend
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_distributed
;
DROP
TABLE
IF
EXISTS
truncate_test
.
test_distributed_depend
;
DROP
DATABASE
IF
EXISTS
truncate_test
;
dbms/tests/queries/0_stateless/00623_truncate_table_throw_exception.reference
0 → 100644
浏览文件 @
6613e567
========Before Truncate========
test_string
========Execute Truncate========
1
========After Truncate========
test_string
dbms/tests/queries/0_stateless/00623_truncate_table_throw_exception.sh
0 → 100755
浏览文件 @
6613e567
#!/usr/bin/env bash
CURDIR
=
$(
cd
"
$(
dirname
"
${
BASH_SOURCE
[0]
}
"
)
"
&&
pwd
)
.
$CURDIR
/../shell_config.sh
${
CLICKHOUSE_CLIENT
}
--query
"DROP DATABASE IF EXISTS test_truncate;"
${
CLICKHOUSE_CLIENT
}
--query
"CREATE DATABASE test_truncate;"
${
CLICKHOUSE_CLIENT
}
--query
"SELECT '========Before Truncate========';"
${
CLICKHOUSE_CLIENT
}
--query
"CREATE TABLE test_truncate.test_view_depend (s String) ENGINE = Log;"
${
CLICKHOUSE_CLIENT
}
--query
"CREATE VIEW test_truncate.test_view AS SELECT * FROM test_truncate.test_view_depend;"
${
CLICKHOUSE_CLIENT
}
--query
"INSERT INTO test_truncate.test_view_depend VALUES('test_string');"
${
CLICKHOUSE_CLIENT
}
--query
"SELECT * FROM test_truncate.test_view;"
${
CLICKHOUSE_CLIENT
}
--query
"SELECT '========Execute Truncate========';"
echo
`
${
CLICKHOUSE_CLIENT
}
--query
"TRUNCATE TABLE test_truncate.test_view;"
2>&1 |
grep
-c
"Code: 48.*Truncate is not supported by storage View"
`
${
CLICKHOUSE_CLIENT
}
--query
"SELECT '========After Truncate========';"
${
CLICKHOUSE_CLIENT
}
--query
"SELECT * FROM test_truncate.test_view;"
${
CLICKHOUSE_CLIENT
}
--query
"DROP DATABASE IF EXISTS test_truncate;"
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录