Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
cf5c998b
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
cf5c998b
编写于
12月 03, 2019
作者:
A
Alexander Tokmakov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor storage name
上级
abcc7d30
变更
78
隐藏空白更改
内联
并排
Showing
78 changed file
with
230 addition
and
364 deletion
+230
-364
dbms/src/Interpreters/Context.h
dbms/src/Interpreters/Context.h
+1
-0
dbms/src/Interpreters/tests/in_join_subqueries_preprocessor.cpp
...rc/Interpreters/tests/in_join_subqueries_preprocessor.cpp
+1
-4
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
+1
-1
dbms/src/Storages/IStorage.cpp
dbms/src/Storages/IStorage.cpp
+23
-1
dbms/src/Storages/IStorage.h
dbms/src/Storages/IStorage.h
+55
-6
dbms/src/Storages/Kafka/StorageKafka.cpp
dbms/src/Storages/Kafka/StorageKafka.cpp
+13
-20
dbms/src/Storages/Kafka/StorageKafka.h
dbms/src/Storages/Kafka/StorageKafka.h
+1
-7
dbms/src/Storages/LiveView/ProxyStorage.h
dbms/src/Storages/LiveView/ProxyStorage.h
+1
-2
dbms/src/Storages/LiveView/StorageLiveView.cpp
dbms/src/Storages/LiveView/StorageLiveView.cpp
+12
-9
dbms/src/Storages/LiveView/StorageLiveView.h
dbms/src/Storages/LiveView/StorageLiveView.h
+0
-4
dbms/src/Storages/MergeTree/MergeTreeData.cpp
dbms/src/Storages/MergeTree/MergeTreeData.cpp
+15
-13
dbms/src/Storages/MergeTree/MergeTreeData.h
dbms/src/Storages/MergeTree/MergeTreeData.h
+0
-5
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
...src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
+1
-1
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
...c/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
+1
-1
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
...Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
+1
-1
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
...torages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
+1
-1
dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
+2
-3
dbms/src/Storages/StorageBuffer.cpp
dbms/src/Storages/StorageBuffer.cpp
+3
-3
dbms/src/Storages/StorageBuffer.h
dbms/src/Storages/StorageBuffer.h
+0
-11
dbms/src/Storages/StorageDictionary.cpp
dbms/src/Storages/StorageDictionary.cpp
+1
-2
dbms/src/Storages/StorageDictionary.h
dbms/src/Storages/StorageDictionary.h
+0
-4
dbms/src/Storages/StorageDistributed.cpp
dbms/src/Storages/StorageDistributed.cpp
+7
-8
dbms/src/Storages/StorageDistributed.h
dbms/src/Storages/StorageDistributed.h
+0
-4
dbms/src/Storages/StorageFile.cpp
dbms/src/Storages/StorageFile.cpp
+8
-8
dbms/src/Storages/StorageFile.h
dbms/src/Storages/StorageFile.h
+0
-4
dbms/src/Storages/StorageHDFS.cpp
dbms/src/Storages/StorageHDFS.cpp
+2
-9
dbms/src/Storages/StorageHDFS.h
dbms/src/Storages/StorageHDFS.h
+0
-6
dbms/src/Storages/StorageInput.cpp
dbms/src/Storages/StorageInput.cpp
+1
-1
dbms/src/Storages/StorageInput.h
dbms/src/Storages/StorageInput.h
+0
-2
dbms/src/Storages/StorageJoin.cpp
dbms/src/Storages/StorageJoin.cpp
+1
-1
dbms/src/Storages/StorageLog.cpp
dbms/src/Storages/StorageLog.cpp
+2
-3
dbms/src/Storages/StorageLog.h
dbms/src/Storages/StorageLog.h
+0
-5
dbms/src/Storages/StorageMaterializedView.cpp
dbms/src/Storages/StorageMaterializedView.cpp
+13
-12
dbms/src/Storages/StorageMaterializedView.h
dbms/src/Storages/StorageMaterializedView.h
+0
-4
dbms/src/Storages/StorageMemory.cpp
dbms/src/Storages/StorageMemory.cpp
+1
-1
dbms/src/Storages/StorageMemory.h
dbms/src/Storages/StorageMemory.h
+0
-11
dbms/src/Storages/StorageMerge.cpp
dbms/src/Storages/StorageMerge.cpp
+3
-4
dbms/src/Storages/StorageMerge.h
dbms/src/Storages/StorageMerge.h
+0
-10
dbms/src/Storages/StorageMergeTree.cpp
dbms/src/Storages/StorageMergeTree.cpp
+9
-4
dbms/src/Storages/StorageMergeTree.h
dbms/src/Storages/StorageMergeTree.h
+0
-2
dbms/src/Storages/StorageMySQL.cpp
dbms/src/Storages/StorageMySQL.cpp
+1
-2
dbms/src/Storages/StorageMySQL.h
dbms/src/Storages/StorageMySQL.h
+0
-4
dbms/src/Storages/StorageNull.cpp
dbms/src/Storages/StorageNull.cpp
+2
-4
dbms/src/Storages/StorageNull.h
dbms/src/Storages/StorageNull.h
+1
-11
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+23
-15
dbms/src/Storages/StorageReplicatedMergeTree.h
dbms/src/Storages/StorageReplicatedMergeTree.h
+0
-2
dbms/src/Storages/StorageS3.cpp
dbms/src/Storages/StorageS3.cpp
+1
-9
dbms/src/Storages/StorageS3.h
dbms/src/Storages/StorageS3.h
+0
-9
dbms/src/Storages/StorageSet.cpp
dbms/src/Storages/StorageSet.cpp
+2
-3
dbms/src/Storages/StorageSet.h
dbms/src/Storages/StorageSet.h
+0
-5
dbms/src/Storages/StorageStripeLog.cpp
dbms/src/Storages/StorageStripeLog.cpp
+4
-4
dbms/src/Storages/StorageStripeLog.h
dbms/src/Storages/StorageStripeLog.h
+0
-4
dbms/src/Storages/StorageTinyLog.cpp
dbms/src/Storages/StorageTinyLog.cpp
+3
-4
dbms/src/Storages/StorageTinyLog.h
dbms/src/Storages/StorageTinyLog.h
+0
-5
dbms/src/Storages/StorageURL.cpp
dbms/src/Storages/StorageURL.cpp
+1
-7
dbms/src/Storages/StorageURL.h
dbms/src/Storages/StorageURL.h
+0
-7
dbms/src/Storages/StorageValues.cpp
dbms/src/Storages/StorageValues.cpp
+1
-1
dbms/src/Storages/StorageValues.h
dbms/src/Storages/StorageValues.h
+0
-4
dbms/src/Storages/StorageView.cpp
dbms/src/Storages/StorageView.cpp
+1
-1
dbms/src/Storages/StorageView.h
dbms/src/Storages/StorageView.h
+0
-10
dbms/src/Storages/System/IStorageSystemOneBlock.h
dbms/src/Storages/System/IStorageSystemOneBlock.h
+1
-7
dbms/src/Storages/System/StorageSystemColumns.cpp
dbms/src/Storages/System/StorageSystemColumns.cpp
+1
-1
dbms/src/Storages/System/StorageSystemColumns.h
dbms/src/Storages/System/StorageSystemColumns.h
+0
-5
dbms/src/Storages/System/StorageSystemDetachedParts.cpp
dbms/src/Storages/System/StorageSystemDetachedParts.cpp
+1
-2
dbms/src/Storages/System/StorageSystemDisks.cpp
dbms/src/Storages/System/StorageSystemDisks.cpp
+1
-1
dbms/src/Storages/System/StorageSystemDisks.h
dbms/src/Storages/System/StorageSystemDisks.h
+0
-5
dbms/src/Storages/System/StorageSystemNumbers.cpp
dbms/src/Storages/System/StorageSystemNumbers.cpp
+1
-1
dbms/src/Storages/System/StorageSystemNumbers.h
dbms/src/Storages/System/StorageSystemNumbers.h
+0
-3
dbms/src/Storages/System/StorageSystemOne.cpp
dbms/src/Storages/System/StorageSystemOne.cpp
+1
-1
dbms/src/Storages/System/StorageSystemOne.h
dbms/src/Storages/System/StorageSystemOne.h
+0
-5
dbms/src/Storages/System/StorageSystemPartsBase.cpp
dbms/src/Storages/System/StorageSystemPartsBase.cpp
+1
-1
dbms/src/Storages/System/StorageSystemPartsBase.h
dbms/src/Storages/System/StorageSystemPartsBase.h
+0
-5
dbms/src/Storages/System/StorageSystemReplicas.cpp
dbms/src/Storages/System/StorageSystemReplicas.cpp
+1
-1
dbms/src/Storages/System/StorageSystemReplicas.h
dbms/src/Storages/System/StorageSystemReplicas.h
+0
-5
dbms/src/Storages/System/StorageSystemStoragePolicies.cpp
dbms/src/Storages/System/StorageSystemStoragePolicies.cpp
+1
-1
dbms/src/Storages/System/StorageSystemStoragePolicies.h
dbms/src/Storages/System/StorageSystemStoragePolicies.h
+0
-5
dbms/src/Storages/System/StorageSystemTables.cpp
dbms/src/Storages/System/StorageSystemTables.cpp
+1
-1
dbms/src/Storages/System/StorageSystemTables.h
dbms/src/Storages/System/StorageSystemTables.h
+0
-5
未找到文件。
dbms/src/Interpreters/Context.h
浏览文件 @
cf5c998b
...
...
@@ -91,6 +91,7 @@ class CompiledExpressionCache;
#endif
/// (database name, table name)
//FIXME replace with StorageID
using
DatabaseAndTableName
=
std
::
pair
<
String
,
String
>
;
/// Table -> set of table-views that make SELECT from it.
...
...
dbms/src/Interpreters/tests/in_join_subqueries_preprocessor.cpp
浏览文件 @
cf5c998b
...
...
@@ -38,12 +38,9 @@ public:
std
::
string
getRemoteDatabaseName
()
const
{
return
remote_database
;
}
std
::
string
getRemoteTableName
()
const
{
return
remote_table
;
}
std
::
string
getTableName
()
const
override
{
return
""
;
}
std
::
string
getDatabaseName
()
const
override
{
return
""
;
}
protected:
StorageDistributedFake
(
const
std
::
string
&
remote_database_
,
const
std
::
string
&
remote_table_
,
size_t
shard_count_
)
:
remote_database
(
remote_database_
),
remote_table
(
remote_table_
),
shard_count
(
shard_count_
)
:
IStorage
({
""
,
""
}),
remote_database
(
remote_database_
),
remote_table
(
remote_table_
),
shard_count
(
shard_count_
)
{
}
...
...
dbms/src/Storages/Distributed/DirectoryMonitor.cpp
浏览文件 @
cf5c998b
...
...
@@ -642,7 +642,7 @@ bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & f
std
::
string
StorageDistributedDirectoryMonitor
::
getLoggerName
()
const
{
return
storage
.
table_name
+
'.'
+
storage
.
get
Name
()
+
".DirectoryMonitor"
;
return
storage
.
getStorageID
().
getFullTable
Name
()
+
".DirectoryMonitor"
;
}
void
StorageDistributedDirectoryMonitor
::
updatePath
()
...
...
dbms/src/Storages/IStorage.cpp
浏览文件 @
cf5c998b
...
...
@@ -29,7 +29,7 @@ namespace ErrorCodes
extern
const
int
TABLE_IS_DROPPED
;
}
IStorage
::
IStorage
(
ColumnsDescription
virtuals_
)
:
virtuals
(
std
::
move
(
virtuals_
))
IStorage
::
IStorage
(
StorageID
id_
,
ColumnsDescription
virtuals_
)
:
id
(
std
::
move
(
id_
)),
virtuals
(
std
::
move
(
virtuals_
))
{
}
...
...
@@ -463,4 +463,26 @@ DB::CompressionMethod IStorage::chooseCompressionMethod(const String & uri, cons
throw
Exception
(
"Only auto, none, gzip supported as compression method"
,
ErrorCodes
::
NOT_IMPLEMENTED
);
}
StorageID
IStorage
::
getStorageID
(
std
::
unique_lock
<
std
::
mutex
>
*
id_lock
)
const
{
std
::
unique_lock
<
std
::
mutex
>
lock
;
if
(
!
id_lock
)
lock
=
std
::
unique_lock
(
id_mutex
);
else
if
(
!*
id_lock
)
*
id_lock
=
std
::
unique_lock
(
id_mutex
);
return
id
;
}
void
IStorage
::
renameInMemory
(
const
String
&
new_database_name
,
const
String
&
new_table_name
,
std
::
unique_lock
<
std
::
mutex
>
*
id_lock
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
;
if
(
!
id_lock
)
lock
=
std
::
unique_lock
(
id_mutex
);
else
if
(
!*
id_lock
)
*
id_lock
=
std
::
unique_lock
(
id_mutex
);
id
.
database_name
=
new_database_name
;
id
.
table_name
=
new_table_name
;
}
}
dbms/src/Storages/IStorage.h
浏览文件 @
cf5c998b
...
...
@@ -65,6 +65,35 @@ struct ColumnSize
}
};
struct
StorageID
{
String
database_name
;
String
table_name
;
String
uuid
;
StorageID
()
=
delete
;
StorageID
(
const
String
&
database
,
const
String
&
table
,
const
String
&
uuid_
=
{})
:
database_name
(
database
),
table_name
(
table
),
uuid
(
uuid_
)
{}
String
getFullTableName
()
const
{
return
(
database_name
.
empty
()
?
""
:
database_name
+
"."
)
+
table_name
;
}
String
getNameForLogs
()
const
{
return
"`"
+
getFullTableName
()
+
"` (UUID = "
+
uuid
+
")"
;
}
String
getId
()
const
{
//if (uuid.empty())
return
getFullTableName
();
//else
// return uuid;
}
};
/** Storage. Describes the table. Responsible for
* - storage of the table data;
* - the definition in which files (or not in files) the data is stored;
...
...
@@ -75,8 +104,9 @@ struct ColumnSize
class
IStorage
:
public
std
::
enable_shared_from_this
<
IStorage
>
,
public
TypePromotion
<
IStorage
>
{
public:
IStorage
()
=
default
;
explicit
IStorage
(
ColumnsDescription
virtuals_
);
IStorage
()
=
delete
;
explicit
IStorage
(
StorageID
id_
)
:
id
(
std
::
move
(
id_
))
{}
IStorage
(
StorageID
id_
,
ColumnsDescription
virtuals_
);
virtual
~
IStorage
()
=
default
;
IStorage
(
const
IStorage
&
)
=
delete
;
...
...
@@ -86,8 +116,19 @@ public:
virtual
std
::
string
getName
()
const
=
0
;
/// The name of the table.
virtual
std
::
string
getTableName
()
const
=
0
;
virtual
std
::
string
getDatabaseName
()
const
{
return
{};
}
StorageID
getStorageID
(
std
::
unique_lock
<
std
::
mutex
>
*
lock
=
nullptr
)
const
;
// FIXME remove those methods
std
::
string
getTableName
()
const
{
std
::
lock_guard
lock
(
id_mutex
);
return
id
.
table_name
;
}
std
::
string
getDatabaseName
()
const
{
std
::
lock_guard
lock
(
id_mutex
);
return
id
.
database_name
;
}
/// Returns true if the storage receives data from a remote server or servers.
virtual
bool
isRemote
()
const
{
return
false
;
}
...
...
@@ -166,6 +207,8 @@ protected: /// still thread-unsafe part.
IDatabase
::
ASTModifier
getSettingsModifier
(
const
SettingsChanges
&
new_changes
)
const
;
private:
StorageID
id
;
mutable
std
::
mutex
id_mutex
;
ColumnsDescription
columns
;
/// combined real and virtual columns
const
ColumnsDescription
virtuals
=
{};
IndicesDescription
indices
;
...
...
@@ -304,12 +347,18 @@ public:
* In this function, you need to rename the directory with the data, if any.
* Called when the table structure is locked for write.
*/
virtual
void
rename
(
const
String
&
/*new_path_to_table_data*/
,
const
String
&
/*new_database_name*/
,
const
String
&
/*new_table_name*/
,
virtual
void
rename
(
const
String
&
/*new_path_to_table_data*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
throw
Exception
(
"Method rename is not supported by storage "
+
getName
(),
ErrorCodes
::
NOT_IMPLEMENTED
);
renameInMemory
(
new_database_name
,
new_table_name
);
}
/**
* Just updates names of database and table without moving any data on disk
* Can be called only from DatabaseAtomic.
*/
virtual
void
renameInMemory
(
const
String
&
new_database_name
,
const
String
&
new_table_name
,
std
::
unique_lock
<
std
::
mutex
>
*
id_lock
=
nullptr
);
/** ALTER tables in the form of column changes that do not affect the change to Storage or its parameters.
* This method must fully execute the ALTER query, taking care of the locks itself.
* To update the table metadata on disk, this method should call InterpreterAlterQuery::updateMetadata.
...
...
dbms/src/Storages/Kafka/StorageKafka.cpp
浏览文件 @
cf5c998b
...
...
@@ -89,14 +89,12 @@ StorageKafka::StorageKafka(
UInt64
max_block_size_
,
size_t
skip_broken_
,
bool
intermediate_commit_
)
:
IStorage
(
:
IStorage
(
{
database_name_
,
table_name_
},
ColumnsDescription
({{
"_topic"
,
std
::
make_shared
<
DataTypeString
>
()},
{
"_key"
,
std
::
make_shared
<
DataTypeString
>
()},
{
"_offset"
,
std
::
make_shared
<
DataTypeUInt64
>
()},
{
"_partition"
,
std
::
make_shared
<
DataTypeUInt64
>
()},
{
"_timestamp"
,
std
::
make_shared
<
DataTypeNullable
>
(
std
::
make_shared
<
DataTypeDateTime
>
())}},
true
))
,
table_name
(
table_name_
)
,
database_name
(
database_name_
)
,
global_context
(
context_
.
getGlobalContext
())
,
kafka_context
(
Context
(
global_context
))
,
topics
(
global_context
.
getMacros
()
->
expand
(
topics_
))
...
...
@@ -195,14 +193,6 @@ void StorageKafka::shutdown()
task
->
deactivate
();
}
void
StorageKafka
::
rename
(
const
String
&
/* new_path_to_db */
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
void
StorageKafka
::
updateDependencies
()
{
task
->
activateAndSchedule
();
...
...
@@ -303,10 +293,10 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
}
}
bool
StorageKafka
::
checkDependencies
(
const
St
ring
&
current_database_name
,
const
String
&
current_table_name
)
bool
StorageKafka
::
checkDependencies
(
const
St
orageID
&
table_id
)
{
// Check if all dependencies are attached
auto
dependencies
=
global_context
.
getDependencies
(
current_database_name
,
current_table_name
);
auto
dependencies
=
global_context
.
getDependencies
(
table_id
.
database_name
,
table_id
.
database_name
);
//FIXME replace with id
if
(
dependencies
.
size
()
==
0
)
return
true
;
...
...
@@ -323,7 +313,7 @@ bool StorageKafka::checkDependencies(const String & current_database_name, const
return
false
;
// Check all its dependencies
if
(
!
checkDependencies
(
db_tab
.
first
,
db_tab
.
second
))
if
(
!
checkDependencies
(
StorageID
(
db_tab
.
first
,
db_tab
.
second
)))
//FIXME replace with id
return
false
;
}
...
...
@@ -334,13 +324,14 @@ void StorageKafka::threadFunc()
{
try
{
auto
table_id
=
getStorageID
();
// Check if at least one direct dependency is attached
auto
dependencies
=
global_context
.
getDependencies
(
database_name
,
table_name
);
auto
dependencies
=
global_context
.
getDependencies
(
table_id
.
database_name
,
table_id
.
database_name
);
//FIXME replace with id
// Keep streaming as long as there are attached views and streaming is not cancelled
while
(
!
stream_cancelled
&&
num_created_consumers
>
0
&&
dependencies
.
size
()
>
0
)
{
if
(
!
checkDependencies
(
database_name
,
table_name
))
if
(
!
checkDependencies
(
table_id
))
break
;
LOG_DEBUG
(
log
,
"Started streaming to "
<<
dependencies
.
size
()
<<
" attached views"
);
...
...
@@ -363,14 +354,16 @@ void StorageKafka::threadFunc()
bool
StorageKafka
::
streamToViews
()
{
auto
table
=
global_context
.
getTable
(
database_name
,
table_name
);
auto
table_id
=
getStorageID
();
auto
table
=
global_context
.
getTable
(
table_id
.
database_name
,
table_id
.
table_name
);
if
(
!
table
)
throw
Exception
(
"Engine table "
+
backQuote
(
database_name
)
+
"."
+
backQuote
(
table_name
)
+
" doesn't exist."
,
ErrorCodes
::
LOGICAL_ERROR
);
throw
Exception
(
"Engine table "
+
table_id
.
getNameForLogs
(
)
+
" doesn't exist."
,
ErrorCodes
::
LOGICAL_ERROR
);
// Create an INSERT query for streaming data
auto
insert
=
std
::
make_shared
<
ASTInsertQuery
>
();
insert
->
database
=
database_name
;
insert
->
table
=
table_name
;
//FIXME use uid if not empty
insert
->
database
=
table_id
.
database_name
;
insert
->
table
=
table_id
.
table_name
;
const
Settings
&
settings
=
global_context
.
getSettingsRef
();
size_t
block_size
=
max_block_size
;
...
...
dbms/src/Storages/Kafka/StorageKafka.h
浏览文件 @
cf5c998b
...
...
@@ -28,8 +28,6 @@ class StorageKafka : public ext::shared_ptr_helper<StorageKafka>, public IStorag
friend
struct
ext
::
shared_ptr_helper
<
StorageKafka
>
;
public:
std
::
string
getName
()
const
override
{
return
"Kafka"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
supportsSettings
()
const
override
{
return
true
;
}
bool
noPushingToViews
()
const
override
{
return
true
;
}
...
...
@@ -49,8 +47,6 @@ public:
const
ASTPtr
&
query
,
const
Context
&
context
)
override
;
void
rename
(
const
String
&
/* new_path_to_db */
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
;
void
updateDependencies
()
override
;
void
pushReadBuffer
(
ConsumerBufferPtr
buf
);
...
...
@@ -79,8 +75,6 @@ protected:
private:
// Configuration and state
String
table_name
;
String
database_name
;
Context
global_context
;
Context
kafka_context
;
Names
topics
;
...
...
@@ -118,7 +112,7 @@ private:
void
threadFunc
();
bool
streamToViews
();
bool
checkDependencies
(
const
St
ring
&
database_name
,
const
String
&
table_name
);
bool
checkDependencies
(
const
St
orageID
&
table_id
);
};
}
dbms/src/Storages/LiveView/ProxyStorage.h
浏览文件 @
cf5c998b
...
...
@@ -9,11 +9,10 @@ class ProxyStorage : public IStorage
{
public:
ProxyStorage
(
StoragePtr
storage_
,
BlockInputStreams
streams_
,
QueryProcessingStage
::
Enum
to_stage_
)
:
storage
(
std
::
move
(
storage_
)),
streams
(
std
::
move
(
streams_
)),
to_stage
(
to_stage_
)
{}
:
IStorage
({
""
,
storage_
->
getTableName
()}),
storage
(
std
::
move
(
storage_
)),
streams
(
std
::
move
(
streams_
)),
to_stage
(
to_stage_
)
{}
public:
std
::
string
getName
()
const
override
{
return
"ProxyStorage("
+
storage
->
getName
()
+
")"
;
}
std
::
string
getTableName
()
const
override
{
return
storage
->
getTableName
();
}
bool
isRemote
()
const
override
{
return
storage
->
isRemote
();
}
bool
supportsSampling
()
const
override
{
return
storage
->
supportsSampling
();
}
...
...
dbms/src/Storages/LiveView/StorageLiveView.cpp
浏览文件 @
cf5c998b
...
...
@@ -198,8 +198,7 @@ StorageLiveView::StorageLiveView(
Context
&
local_context
,
const
ASTCreateQuery
&
query
,
const
ColumnsDescription
&
columns_
)
:
table_name
(
table_name_
),
database_name
(
database_name_
),
global_context
(
local_context
.
getGlobalContext
())
:
IStorage
({
database_name_
,
table_name_
}),
global_context
(
local_context
.
getGlobalContext
())
{
setColumns
(
columns_
);
...
...
@@ -225,7 +224,7 @@ StorageLiveView::StorageLiveView(
global_context
.
addDependency
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
database_name
_
,
table_name_
));
//FIXME
is_temporary
=
query
.
temporary
;
temporary_live_view_timeout
=
local_context
.
getSettingsRef
().
temporary_live_view_timeout
.
totalSeconds
();
...
...
@@ -339,7 +338,8 @@ bool StorageLiveView::getNewBlocks()
void
StorageLiveView
::
checkTableCanBeDropped
()
const
{
Dependencies
dependencies
=
global_context
.
getDependencies
(
database_name
,
table_name
);
auto
table_id
=
getStorageID
();
Dependencies
dependencies
=
global_context
.
getDependencies
(
table_id
.
database_name
,
table_id
.
table_name
);
//FIXME
if
(
!
dependencies
.
empty
())
{
DatabaseAndTableName
database_and_table_name
=
dependencies
.
front
();
...
...
@@ -354,6 +354,7 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
if
(
storage
->
shutdown_called
)
return
;
auto
table_id
=
storage
->
getStorageID
();
{
while
(
1
)
{
...
...
@@ -365,7 +366,7 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
return
;
if
(
storage
->
hasUsers
())
return
;
if
(
!
storage
->
global_context
.
getDependencies
(
storage
->
database_name
,
storage
->
table_name
).
empty
())
if
(
!
storage
->
global_context
.
getDependencies
(
table_id
.
database_name
,
table_id
.
table_name
).
empty
())
//FIXME
continue
;
drop_table
=
true
;
}
...
...
@@ -375,14 +376,14 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
if
(
drop_table
)
{
if
(
storage
->
global_context
.
tryGetTable
(
storage
->
database_name
,
storage
->
table_name
))
if
(
storage
->
global_context
.
tryGetTable
(
table_id
.
database_name
,
table_id
.
table_name
))
//FIXME
{
try
{
/// We create and execute `drop` query for this table
auto
drop_query
=
std
::
make_shared
<
ASTDropQuery
>
();
drop_query
->
database
=
storage
->
database_name
;
drop_query
->
table
=
storage
->
table_name
;
drop_query
->
database
=
table_id
.
database_name
;
drop_query
->
table
=
table_id
.
table_name
;
drop_query
->
kind
=
ASTDropQuery
::
Kind
::
Drop
;
ASTPtr
ast_drop_query
=
drop_query
;
InterpreterDropQuery
drop_interpreter
(
ast_drop_query
,
storage
->
global_context
);
...
...
@@ -390,6 +391,7 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
}
catch
(...)
{
tryLogCurrentException
(
__PRETTY_FUNCTION__
);
}
}
}
...
...
@@ -466,9 +468,10 @@ StorageLiveView::~StorageLiveView()
void
StorageLiveView
::
drop
(
TableStructureWriteLockHolder
&
)
{
auto
table_id
=
getStorageID
();
global_context
.
removeDependency
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
table_id
.
database_name
,
table_id
.
table_name
));
//FIXME
std
::
lock_guard
lock
(
mutex
);
is_dropped
=
true
;
...
...
dbms/src/Storages/LiveView/StorageLiveView.h
浏览文件 @
cf5c998b
...
...
@@ -41,8 +41,6 @@ friend class LiveViewBlockOutputStream;
public:
~
StorageLiveView
()
override
;
String
getName
()
const
override
{
return
"LiveView"
;
}
String
getTableName
()
const
override
{
return
table_name
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
String
getSelectDatabaseName
()
const
{
return
select_database_name
;
}
String
getSelectTableName
()
const
{
return
select_table_name
;
}
...
...
@@ -144,8 +142,6 @@ public:
private:
String
select_database_name
;
String
select_table_name
;
String
table_name
;
String
database_name
;
ASTPtr
inner_query
;
Context
&
global_context
;
bool
is_temporary
=
false
;
...
...
dbms/src/Storages/MergeTree/MergeTreeData.cpp
浏览文件 @
cf5c998b
...
...
@@ -128,16 +128,15 @@ MergeTreeData::MergeTreeData(
bool
require_part_metadata_
,
bool
attach
,
BrokenPartCallback
broken_part_callback_
)
:
global_context
(
context_
)
:
IStorage
({
database_
,
table_
})
,
global_context
(
context_
)
,
merging_params
(
merging_params_
)
,
partition_by_ast
(
partition_by_ast_
)
,
sample_by_ast
(
sample_by_ast_
)
,
require_part_metadata
(
require_part_metadata_
)
,
database_name
(
database_
)
,
table_name
(
table_
)
,
relative_data_path
(
relative_data_path_
)
,
broken_part_callback
(
broken_part_callback_
)
,
log_name
(
database_
name
+
"."
+
table_name
)
,
log_name
(
database_
+
"."
+
table_
)
,
log
(
&
Logger
::
get
(
log_name
))
,
storage_settings
(
std
::
move
(
storage_settings_
))
,
storage_policy
(
context_
.
getStoragePolicy
(
getSettings
()
->
storage_policy
))
...
...
@@ -1152,7 +1151,9 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
/// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging
/// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away
if
(
auto
part_log
=
global_context
.
getPartLog
(
database_name
))
auto
table_id
=
getStorageID
();
if
(
auto
part_log
=
global_context
.
getPartLog
(
table_id
.
database_name
))
//FIXME
{
PartLogElement
part_log_elem
;
...
...
@@ -1160,8 +1161,8 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
part_log_elem
.
event_time
=
time
(
nullptr
);
part_log_elem
.
duration_ms
=
0
;
part_log_elem
.
database_name
=
database_name
;
part_log_elem
.
table_name
=
table_name
;
part_log_elem
.
database_name
=
table_id
.
database_name
;
part_log_elem
.
table_name
=
table_
id
.
table_
name
;
for
(
auto
&
part
:
parts
)
{
...
...
@@ -1236,8 +1237,7 @@ void MergeTreeData::rename(
global_context
.
dropCaches
();
relative_data_path
=
new_table_path
;
table_name
=
new_table_name
;
database_name
=
new_database_name
;
renameInMemory
(
new_database_name
,
new_table_name
);
}
void
MergeTreeData
::
dropAllData
()
...
...
@@ -3396,7 +3396,8 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePt
{
MergeTreeData
*
src_data
=
dynamic_cast
<
MergeTreeData
*>
(
source_table
.
get
());
if
(
!
src_data
)
throw
Exception
(
"Table "
+
table_name
+
" supports attachPartitionFrom only for MergeTree family of table engines."
throw
Exception
(
"Table "
+
source_table
->
getStorageID
().
getNameForLogs
()
+
" supports attachPartitionFrom only for MergeTree family of table engines."
" Got "
+
source_table
->
getName
(),
ErrorCodes
::
NOT_IMPLEMENTED
);
if
(
getColumns
().
getAllPhysical
().
sizeOfDifference
(
src_data
->
getColumns
().
getAllPhysical
()))
...
...
@@ -3557,7 +3558,8 @@ void MergeTreeData::writePartLog(
const
MergeListEntry
*
merge_entry
)
try
{
auto
part_log
=
global_context
.
getPartLog
(
database_name
);
auto
table_id
=
getStorageID
();
auto
part_log
=
global_context
.
getPartLog
(
table_id
.
database_name
);
if
(
!
part_log
)
return
;
...
...
@@ -3572,8 +3574,8 @@ try
/// TODO: Stop stopwatch in outer code to exclude ZK timings and so on
part_log_elem
.
duration_ms
=
elapsed_ns
/
1000000
;
part_log_elem
.
database_name
=
database_name
;
part_log_elem
.
table_name
=
table_name
;
part_log_elem
.
database_name
=
table_id
.
database_name
;
part_log_elem
.
table_name
=
table_
id
.
table_
name
;
part_log_elem
.
partition_id
=
MergeTreePartInfo
::
fromPartName
(
new_part_name
,
format_version
).
partition_id
;
part_log_elem
.
part_name
=
new_part_name
;
...
...
dbms/src/Storages/MergeTree/MergeTreeData.h
浏览文件 @
cf5c998b
...
...
@@ -401,9 +401,6 @@ public:
||
column_name
==
"_sample_factor"
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
String
getTableName
()
const
override
{
return
table_name
;
}
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void
loadDataParts
(
bool
skip_sanity_checks
);
...
...
@@ -790,8 +787,6 @@ protected:
bool
require_part_metadata
;
String
database_name
;
String
table_name
;
String
relative_data_path
;
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
浏览文件 @
cf5c998b
...
...
@@ -25,7 +25,7 @@ static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000;
ReplicatedMergeTreeAlterThread
::
ReplicatedMergeTreeAlterThread
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
)
,
zk_node_cache
([
&
]
{
return
storage
.
getZooKeeper
();
})
,
log_name
(
storage
.
database_name
+
"."
+
storage
.
table_name
+
" (ReplicatedMergeTreeAlterThread)"
)
,
log_name
(
storage
.
getStorageID
().
getFullTableName
()
+
" (ReplicatedMergeTreeAlterThread)"
)
,
log
(
&
Logger
::
get
(
log_name
))
{
task
=
storage_
.
global_context
.
getSchedulePool
().
createTask
(
log_name
,
[
this
]{
run
();
});
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
浏览文件 @
cf5c998b
...
...
@@ -19,7 +19,7 @@ namespace ErrorCodes
ReplicatedMergeTreeCleanupThread
::
ReplicatedMergeTreeCleanupThread
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
)
,
log_name
(
storage
.
database_name
+
"."
+
storage
.
table_name
+
" (ReplicatedMergeTreeCleanupThread)"
)
,
log_name
(
storage
.
getStorageID
().
getFullTableName
()
+
" (ReplicatedMergeTreeCleanupThread)"
)
,
log
(
&
Logger
::
get
(
log_name
))
{
task
=
storage
.
global_context
.
getSchedulePool
().
createTask
(
log_name
,
[
this
]{
run
();
});
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
浏览文件 @
cf5c998b
...
...
@@ -24,7 +24,7 @@ static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000;
ReplicatedMergeTreePartCheckThread
::
ReplicatedMergeTreePartCheckThread
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
)
,
log_name
(
storage
.
database_name
+
"."
+
storage
.
table_name
+
" (ReplicatedMergeTreePartCheckThread)"
)
,
log_name
(
storage
.
getStorageID
().
getFullTableName
()
+
" (ReplicatedMergeTreePartCheckThread)"
)
,
log
(
&
Logger
::
get
(
log_name
))
{
task
=
storage
.
global_context
.
getSchedulePool
().
createTask
(
log_name
,
[
this
]
{
run
();
});
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
浏览文件 @
cf5c998b
...
...
@@ -40,7 +40,7 @@ static String generateActiveNodeIdentifier()
ReplicatedMergeTreeRestartingThread
::
ReplicatedMergeTreeRestartingThread
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
)
,
log_name
(
storage
.
database_name
+
"."
+
storage
.
table_name
+
" (ReplicatedMergeTreeRestartingThread)"
)
,
log_name
(
storage
.
getStorageID
().
getFullTableName
()
+
" (ReplicatedMergeTreeRestartingThread)"
)
,
log
(
&
Logger
::
get
(
log_name
))
,
active_node_identifier
(
generateActiveNodeIdentifier
())
{
...
...
dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
浏览文件 @
cf5c998b
...
...
@@ -18,8 +18,6 @@ class StorageFromMergeTreeDataPart : public ext::shared_ptr_helper<StorageFromMe
friend
struct
ext
::
shared_ptr_helper
<
StorageFromMergeTreeDataPart
>
;
public:
String
getName
()
const
override
{
return
"FromMergeTreeDataPart"
;
}
String
getTableName
()
const
override
{
return
part
->
storage
.
getTableName
()
+
" (part "
+
part
->
name
+
")"
;
}
String
getDatabaseName
()
const
override
{
return
part
->
storage
.
getDatabaseName
();
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -51,7 +49,8 @@ public:
protected:
StorageFromMergeTreeDataPart
(
const
MergeTreeData
::
DataPartPtr
&
part_
)
:
IStorage
(
part_
->
storage
.
getVirtuals
()),
part
(
part_
)
:
IStorage
({
part_
->
storage
.
getDatabaseName
(),
part_
->
storage
.
getTableName
()
+
" (part "
+
part_
->
name
+
")"
},
part_
->
storage
.
getVirtuals
()),
part
(
part_
)
{
setColumns
(
part_
->
storage
.
getColumns
());
setIndices
(
part_
->
storage
.
getIndices
());
...
...
dbms/src/Storages/StorageBuffer.cpp
浏览文件 @
cf5c998b
...
...
@@ -61,13 +61,13 @@ StorageBuffer::StorageBuffer(const std::string & database_name_, const std::stri
Context
&
context_
,
size_t
num_shards_
,
const
Thresholds
&
min_thresholds_
,
const
Thresholds
&
max_thresholds_
,
const
String
&
destination_database_
,
const
String
&
destination_table_
,
bool
allow_materialized_
)
:
table_name
(
table_name_
),
database_name
(
database_name_
),
global_context
(
context_
),
:
IStorage
({
database_name_
,
table_name_
}),
global_context
(
context_
),
num_shards
(
num_shards_
),
buffers
(
num_shards_
),
min_thresholds
(
min_thresholds_
),
max_thresholds
(
max_thresholds_
),
destination_database
(
destination_database_
),
destination_table
(
destination_table_
),
no_destination
(
destination_database
.
empty
()
&&
destination_table
.
empty
()),
allow_materialized
(
allow_materialized_
),
log
(
&
Logger
::
get
(
"StorageBuffer ("
+
table_name
+
")"
))
allow_materialized
(
allow_materialized_
),
log
(
&
Logger
::
get
(
"StorageBuffer ("
+
table_name
_
+
")"
))
{
setColumns
(
columns_
);
setConstraints
(
constraints_
);
...
...
dbms/src/Storages/StorageBuffer.h
浏览文件 @
cf5c998b
...
...
@@ -53,8 +53,6 @@ public:
};
std
::
string
getName
()
const
override
{
return
"Buffer"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
QueryProcessingStage
::
Enum
getQueryProcessingStage
(
const
Context
&
context
)
const
override
;
...
...
@@ -73,12 +71,6 @@ public:
void
shutdown
()
override
;
bool
optimize
(
const
ASTPtr
&
query
,
const
ASTPtr
&
partition
,
bool
final
,
bool
deduplicate
,
const
Context
&
context
)
override
;
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
bool
supportsSampling
()
const
override
{
return
true
;
}
bool
supportsPrewhere
()
const
override
{
...
...
@@ -101,9 +93,6 @@ public:
~
StorageBuffer
()
override
;
private:
String
table_name
;
String
database_name
;
Context
global_context
;
struct
Buffer
...
...
dbms/src/Storages/StorageDictionary.cpp
浏览文件 @
cf5c998b
...
...
@@ -30,8 +30,7 @@ StorageDictionary::StorageDictionary(
const
Context
&
context
,
bool
attach
,
const
String
&
dictionary_name_
)
:
table_name
(
table_name_
),
database_name
(
database_name_
),
:
IStorage
({
database_name_
,
table_name_
}),
dictionary_name
(
dictionary_name_
),
logger
(
&
Poco
::
Logger
::
get
(
"StorageDictionary"
))
{
...
...
dbms/src/Storages/StorageDictionary.h
浏览文件 @
cf5c998b
...
...
@@ -24,8 +24,6 @@ class StorageDictionary : public ext::shared_ptr_helper<StorageDictionary>, publ
friend
struct
ext
::
shared_ptr_helper
<
StorageDictionary
>
;
public:
std
::
string
getName
()
const
override
{
return
"Dictionary"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
query_info
,
...
...
@@ -59,8 +57,6 @@ public:
private:
using
Ptr
=
MultiVersion
<
IDictionaryBase
>::
Version
;
String
table_name
;
String
database_name
;
String
dictionary_name
;
Poco
::
Logger
*
logger
;
...
...
dbms/src/Storages/StorageDistributed.cpp
浏览文件 @
cf5c998b
...
...
@@ -216,10 +216,9 @@ StorageDistributed::StorageDistributed(
const
ASTPtr
&
sharding_key_
,
const
String
&
relative_data_path_
,
bool
attach_
)
:
IStorage
(
ColumnsDescription
({
:
IStorage
(
{
database_name_
,
table_name_
},
ColumnsDescription
({
{
"_shard_num"
,
std
::
make_shared
<
DataTypeUInt32
>
()},
},
true
)),
table_name
(
table_name_
),
database_name
(
database_name_
),
remote_database
(
remote_database_
),
remote_table
(
remote_table_
),
global_context
(
context_
),
cluster_name
(
global_context
.
getMacros
()
->
expand
(
cluster_name_
)),
has_sharding_key
(
sharding_key_
),
path
(
relative_data_path_
.
empty
()
?
""
:
(
context_
.
getPath
()
+
relative_data_path_
))
...
...
@@ -237,8 +236,8 @@ StorageDistributed::StorageDistributed(
if
(
!
attach_
&&
!
cluster_name
.
empty
())
{
size_t
num_local_shards
=
global_context
.
getCluster
(
cluster_name
)
->
getLocalShardCount
();
if
(
num_local_shards
&&
remote_database
==
database_name
&&
remote_table
==
table_name
)
throw
Exception
(
"Distributed table "
+
table_name
+
" looks at itself"
,
ErrorCodes
::
INFINITE_LOOP
);
if
(
num_local_shards
&&
remote_database
==
database_name
_
&&
remote_table
==
table_name_
)
throw
Exception
(
"Distributed table "
+
table_name
_
+
" looks at itself"
,
ErrorCodes
::
INFINITE_LOOP
);
}
}
...
...
@@ -342,17 +341,18 @@ BlockInputStreams StorageDistributed::read(
if
(
has_sharding_key
)
{
auto
smaller_cluster
=
skipUnusedShards
(
cluster
,
query_info
);
auto
storage_id
=
getStorageID
();
if
(
smaller_cluster
)
{
cluster
=
smaller_cluster
;
LOG_DEBUG
(
log
,
"Reading from "
<<
database_name
<<
"."
<<
table_name
<<
": "
LOG_DEBUG
(
log
,
"Reading from "
<<
storage_id
.
getNameForLogs
()
<<
": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" "
<<
makeFormattedListOfShards
(
cluster
));
}
else
{
LOG_DEBUG
(
log
,
"Reading from "
<<
database_name
<<
"."
<<
table_name
<<
": "
LOG_DEBUG
(
log
,
"Reading from "
<<
storage_id
.
getNameForLogs
()
<<
": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster"
);
}
}
...
...
@@ -599,8 +599,6 @@ void StorageDistributed::flushClusterNodesAllData()
void
StorageDistributed
::
rename
(
const
String
&
new_path_to_table_data
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
if
(
!
path
.
empty
())
{
auto
new_path
=
global_context
.
getPath
()
+
new_path_to_table_data
;
...
...
@@ -610,6 +608,7 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Str
for
(
auto
&
node
:
cluster_nodes_data
)
node
.
second
.
directory_monitor
->
updatePath
();
}
renameInMemory
(
new_database_name
,
new_table_name
);
}
...
...
dbms/src/Storages/StorageDistributed.h
浏览文件 @
cf5c998b
...
...
@@ -52,8 +52,6 @@ public:
const
Context
&
context_
);
std
::
string
getName
()
const
override
{
return
"Distributed"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
supportsSampling
()
const
override
{
return
true
;
}
bool
supportsFinal
()
const
override
{
return
true
;
}
...
...
@@ -115,8 +113,6 @@ public:
ActionLock
getActionLock
(
StorageActionBlockType
type
)
override
;
String
table_name
;
String
database_name
;
String
remote_database
;
String
remote_table
;
ASTPtr
remote_table_function_ptr
;
...
...
dbms/src/Storages/StorageFile.cpp
浏览文件 @
cf5c998b
...
...
@@ -165,8 +165,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
}
StorageFile
::
StorageFile
(
CommonArguments
args
)
:
table_name
(
args
.
table_name
),
database_name
(
args
.
database_name
),
format_name
(
args
.
format_name
)
,
compression_method
(
args
.
compression_method
),
base_path
(
args
.
context
.
getPath
())
:
IStorage
({
args
.
database_name
,
args
.
table_name
}),
format_name
(
args
.
format_name
),
compression_method
(
args
.
compression_method
),
base_path
(
args
.
context
.
getPath
())
{
setColumns
(
args
.
columns
);
setConstraints
(
args
.
constraints
);
...
...
@@ -292,7 +292,8 @@ public:
else
{
if
(
storage
.
paths
.
size
()
!=
1
)
throw
Exception
(
"Table '"
+
storage
.
table_name
+
"' is in readonly mode because of globs in filepath"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
throw
Exception
(
"Table "
+
storage
.
getStorageID
().
getNameForLogs
()
+
" is in readonly mode because of globs in filepath"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
write_buf
=
getWriteBuffer
<
WriteBufferFromFile
>
(
compression_method
,
storage
.
paths
[
0
],
DBMS_DEFAULT_BUFFER_SIZE
,
O_WRONLY
|
O_APPEND
|
O_CREAT
);
}
...
...
@@ -339,17 +340,17 @@ BlockOutputStreamPtr StorageFile::write(
Strings
StorageFile
::
getDataPaths
()
const
{
if
(
paths
.
empty
())
throw
Exception
(
"Table '"
+
table_name
+
"' is in readonly mode"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
throw
Exception
(
"Table '"
+
getStorageID
().
getNameForLogs
()
+
"' is in readonly mode"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
return
paths
;
}
void
StorageFile
::
rename
(
const
String
&
new_path_to_table_data
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
if
(
!
is_db_table
)
throw
Exception
(
"Can't rename table
'"
+
table_name
+
"'
binded to user-defined file (or FD)"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
throw
Exception
(
"Can't rename table
"
+
getStorageID
().
getNameForLogs
()
+
"
binded to user-defined file (or FD)"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
if
(
paths
.
size
()
!=
1
)
throw
Exception
(
"Can't rename table
'"
+
table_name
+
"'
in readonly mode"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
throw
Exception
(
"Can't rename table
"
+
getStorageID
().
getNameForLogs
()
+
"
in readonly mode"
,
ErrorCodes
::
DATABASE_ACCESS_DENIED
);
std
::
unique_lock
<
std
::
shared_mutex
>
lock
(
rwlock
);
...
...
@@ -358,8 +359,7 @@ void StorageFile::rename(const String & new_path_to_table_data, const String & n
Poco
::
File
(
paths
[
0
]).
renameTo
(
path_new
);
paths
[
0
]
=
std
::
move
(
path_new
);
table_name
=
new_table_name
;
database_name
=
new_database_name
;
renameInMemory
(
new_database_name
,
new_table_name
);
}
...
...
dbms/src/Storages/StorageFile.h
浏览文件 @
cf5c998b
...
...
@@ -23,8 +23,6 @@ class StorageFile : public ext::shared_ptr_helper<StorageFile>, public IStorage
friend
struct
ext
::
shared_ptr_helper
<
StorageFile
>
;
public:
std
::
string
getName
()
const
override
{
return
"File"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -69,8 +67,6 @@ protected:
private:
explicit
StorageFile
(
CommonArguments
args
);
std
::
string
table_name
;
std
::
string
database_name
;
std
::
string
format_name
;
int
table_fd
=
-
1
;
...
...
dbms/src/Storages/StorageHDFS.cpp
浏览文件 @
cf5c998b
...
...
@@ -42,10 +42,9 @@ StorageHDFS::StorageHDFS(const String & uri_,
const
ConstraintsDescription
&
constraints_
,
Context
&
context_
,
const
String
&
compression_method_
=
""
)
:
uri
(
uri_
)
:
IStorage
({
database_name_
,
table_name_
})
,
uri
(
uri_
)
,
format_name
(
format_name_
)
,
table_name
(
table_name_
)
,
database_name
(
database_name_
)
,
context
(
context_
)
,
compression_method
(
compression_method_
)
{
...
...
@@ -219,12 +218,6 @@ BlockInputStreams StorageHDFS::read(
return
narrowBlockInputStreams
(
result
,
num_streams
);
}
void
StorageHDFS
::
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
BlockOutputStreamPtr
StorageHDFS
::
write
(
const
ASTPtr
&
/*query*/
,
const
Context
&
/*context*/
)
{
return
std
::
make_shared
<
HDFSBlockOutputStream
>
(
uri
,
...
...
dbms/src/Storages/StorageHDFS.h
浏览文件 @
cf5c998b
...
...
@@ -18,8 +18,6 @@ class StorageHDFS : public ext::shared_ptr_helper<StorageHDFS>, public IStorage
friend
struct
ext
::
shared_ptr_helper
<
StorageHDFS
>
;
public:
String
getName
()
const
override
{
return
"HDFS"
;
}
String
getTableName
()
const
override
{
return
table_name
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
query_info
,
...
...
@@ -30,8 +28,6 @@ public:
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Context
&
context
)
override
;
void
rename
(
const
String
&
new_path_to_db
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
;
protected:
StorageHDFS
(
const
String
&
uri_
,
const
String
&
database_name_
,
...
...
@@ -45,8 +41,6 @@ protected:
private:
String
uri
;
String
format_name
;
String
table_name
;
String
database_name
;
Context
&
context
;
String
compression_method
;
...
...
dbms/src/Storages/StorageInput.cpp
浏览文件 @
cf5c998b
...
...
@@ -16,7 +16,7 @@ namespace ErrorCodes
}
StorageInput
::
StorageInput
(
const
String
&
table_name_
,
const
ColumnsDescription
&
columns_
)
:
IStorage
(
columns_
),
table_name
(
table_name
_
)
:
IStorage
(
{
""
,
table_name_
},
columns
_
)
{
setColumns
(
columns_
);
}
...
...
dbms/src/Storages/StorageInput.h
浏览文件 @
cf5c998b
...
...
@@ -13,7 +13,6 @@ class StorageInput : public ext::shared_ptr_helper<StorageInput>, public IStorag
friend
struct
ext
::
shared_ptr_helper
<
StorageInput
>
;
public:
String
getName
()
const
override
{
return
"Input"
;
}
String
getTableName
()
const
override
{
return
table_name
;
}
/// A table will read from this stream.
void
setInputStream
(
BlockInputStreamPtr
input_stream_
);
...
...
@@ -27,7 +26,6 @@ public:
unsigned
num_streams
)
override
;
private:
String
table_name
;
BlockInputStreamPtr
input_stream
;
protected:
...
...
dbms/src/Storages/StorageJoin.cpp
浏览文件 @
cf5c998b
...
...
@@ -71,7 +71,7 @@ void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteL
HashJoinPtr
StorageJoin
::
getJoin
(
std
::
shared_ptr
<
AnalyzedJoin
>
analyzed_join
)
const
{
if
(
!
(
kind
==
analyzed_join
->
kind
()
&&
strictness
==
analyzed_join
->
strictness
()))
throw
Exception
(
"Table "
+
table_name
+
" has incompatible type of JOIN."
,
ErrorCodes
::
INCOMPATIBLE_TYPE_OF_JOIN
);
throw
Exception
(
"Table "
+
getStorageID
().
getNameForLogs
()
+
" has incompatible type of JOIN."
,
ErrorCodes
::
INCOMPATIBLE_TYPE_OF_JOIN
);
/// TODO: check key columns
...
...
dbms/src/Storages/StorageLog.cpp
浏览文件 @
cf5c998b
...
...
@@ -425,7 +425,7 @@ StorageLog::StorageLog(
const
ConstraintsDescription
&
constraints_
,
size_t
max_compress_block_size_
,
const
Context
&
context_
)
:
base_path
(
context_
.
getPath
()),
path
(
base_path
+
relative_path_
),
table_name
(
table_name_
),
database_name
(
database_name
_
),
:
IStorage
({
database_name_
,
table_name_
}),
base_path
(
context_
.
getPath
()),
path
(
base_path
+
relative_path
_
),
max_compress_block_size
(
max_compress_block_size_
),
file_checker
(
path
+
"sizes.json"
)
{
...
...
@@ -522,14 +522,13 @@ void StorageLog::rename(const String & new_path_to_table_data, const String & ne
Poco
::
File
(
path
).
renameTo
(
new_path
);
path
=
new_path
;
table_name
=
new_table_name
;
database_name
=
new_database_name
;
file_checker
.
setPath
(
path
+
"sizes.json"
);
for
(
auto
&
file
:
files
)
file
.
second
.
data_file
=
Poco
::
File
(
path
+
Poco
::
Path
(
file
.
second
.
data_file
.
path
()).
getFileName
());
marks_file
=
Poco
::
File
(
path
+
DBMS_STORAGE_LOG_MARKS_FILE_NAME
);
renameInMemory
(
new_database_name
,
new_table_name
);
}
void
StorageLog
::
truncate
(
const
ASTPtr
&
,
const
Context
&
,
TableStructureWriteLockHolder
&
)
...
...
dbms/src/Storages/StorageLog.h
浏览文件 @
cf5c998b
...
...
@@ -25,8 +25,6 @@ friend struct ext::shared_ptr_helper<StorageLog>;
public:
std
::
string
getName
()
const
override
{
return
"Log"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -65,8 +63,6 @@ protected:
private:
String
base_path
;
String
path
;
String
table_name
;
String
database_name
;
mutable
std
::
shared_mutex
rwlock
;
...
...
@@ -115,7 +111,6 @@ private:
void
loadMarks
();
/// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
void
addFile
(
const
String
&
column_name
,
const
IDataType
&
type
,
size_t
level
=
0
);
/** For normal columns, the number of rows in the block is specified in the marks.
* For array columns and nested structures, there are more than one group of marks that correspond to different files
...
...
dbms/src/Storages/StorageMaterializedView.cpp
浏览文件 @
cf5c998b
...
...
@@ -100,8 +100,7 @@ StorageMaterializedView::StorageMaterializedView(
const
ASTCreateQuery
&
query
,
const
ColumnsDescription
&
columns_
,
bool
attach_
)
:
table_name
(
table_name_
),
database_name
(
database_name_
),
global_context
(
local_context
.
getGlobalContext
())
:
IStorage
({
database_name_
,
table_name_
}),
global_context
(
local_context
.
getGlobalContext
())
{
setColumns
(
columns_
);
...
...
@@ -127,7 +126,7 @@ StorageMaterializedView::StorageMaterializedView(
if
(
!
select_table_name
.
empty
())
global_context
.
addDependency
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
database_name
_
,
table_name_
));
//FIXME
// If the destination table is not set, use inner table
if
(
!
query
.
to_table
.
empty
())
...
...
@@ -137,8 +136,8 @@ StorageMaterializedView::StorageMaterializedView(
}
else
{
target_database_name
=
database_name
;
target_table_name
=
generateInnerTableName
(
table_name
);
target_database_name
=
database_name
_
;
target_table_name
=
generateInnerTableName
(
table_name
_
);
has_inner_table
=
true
;
}
...
...
@@ -169,7 +168,7 @@ StorageMaterializedView::StorageMaterializedView(
if
(
!
select_table_name
.
empty
())
global_context
.
removeDependency
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
database_name
_
,
table_name_
));
//FIXME
throw
;
}
...
...
@@ -238,9 +237,10 @@ static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context,
void
StorageMaterializedView
::
drop
(
TableStructureWriteLockHolder
&
)
{
auto
table_id
=
getStorageID
();
global_context
.
removeDependency
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
table_id
.
database_name
,
table_id
.
table_name
));
//FIXME
if
(
has_inner_table
&&
tryGetTargetTable
())
executeDropQuery
(
ASTDropQuery
::
Kind
::
Drop
,
global_context
,
target_database_name
,
target_table_name
);
...
...
@@ -314,25 +314,26 @@ void StorageMaterializedView::rename(
}
auto
lock
=
global_context
.
getLock
();
auto
table_id
=
getStorageID
();
global_context
.
removeDependencyUnsafe
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
table_id
.
database_name
,
table_id
.
table_name
));
table_name
=
new_table_name
;
database_name
=
new_database_name
;
IStorage
::
renameInMemory
(
new_database_name
,
new_table_name
);
global_context
.
addDependencyUnsafe
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
new_database_name
,
new_
table_name
));
}
void
StorageMaterializedView
::
shutdown
()
{
auto
table_id
=
getStorageID
();
/// Make sure the dependency is removed after DETACH TABLE
global_context
.
removeDependency
(
DatabaseAndTableName
(
select_database_name
,
select_table_name
),
DatabaseAndTableName
(
database_name
,
table_name
));
DatabaseAndTableName
(
table_id
.
database_name
,
table_id
.
table_name
));
//FIXME
}
StoragePtr
StorageMaterializedView
::
getTargetTable
()
const
...
...
dbms/src/Storages/StorageMaterializedView.h
浏览文件 @
cf5c998b
...
...
@@ -14,8 +14,6 @@ class StorageMaterializedView : public ext::shared_ptr_helper<StorageMaterialize
friend
struct
ext
::
shared_ptr_helper
<
StorageMaterializedView
>
;
public:
std
::
string
getName
()
const
override
{
return
"MaterializedView"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
ASTPtr
getInnerQuery
()
const
{
return
inner_query
->
clone
();
}
...
...
@@ -72,8 +70,6 @@ private:
String
select_table_name
;
String
target_database_name
;
String
target_table_name
;
String
table_name
;
String
database_name
;
ASTPtr
inner_query
;
Context
&
global_context
;
bool
has_inner_table
=
false
;
...
...
dbms/src/Storages/StorageMemory.cpp
浏览文件 @
cf5c998b
...
...
@@ -75,7 +75,7 @@ private:
StorageMemory
::
StorageMemory
(
String
database_name_
,
String
table_name_
,
ColumnsDescription
columns_description_
,
ConstraintsDescription
constraints_
)
:
database_name
(
std
::
move
(
database_name_
)),
table_name
(
std
::
move
(
table_name_
)
)
:
IStorage
({
database_name_
,
table_name_
}
)
{
setColumns
(
std
::
move
(
columns_description_
));
setConstraints
(
std
::
move
(
constraints_
));
...
...
dbms/src/Storages/StorageMemory.h
浏览文件 @
cf5c998b
...
...
@@ -25,8 +25,6 @@ friend struct ext::shared_ptr_helper<StorageMemory>;
public:
String
getName
()
const
override
{
return
"Memory"
;
}
String
getTableName
()
const
override
{
return
table_name
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
size_t
getSize
()
const
{
return
data
.
size
();
}
...
...
@@ -44,16 +42,7 @@ public:
void
truncate
(
const
ASTPtr
&
,
const
Context
&
,
TableStructureWriteLockHolder
&
)
override
;
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
private:
String
database_name
;
String
table_name
;
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
BlocksList
data
;
...
...
dbms/src/Storages/StorageMerge.cpp
浏览文件 @
cf5c998b
...
...
@@ -52,9 +52,7 @@ StorageMerge::StorageMerge(
const
String
&
source_database_
,
const
String
&
table_name_regexp_
,
const
Context
&
context_
)
:
IStorage
(
ColumnsDescription
({{
"_table"
,
std
::
make_shared
<
DataTypeString
>
()}},
true
))
,
table_name
(
table_name_
)
,
database_name
(
database_name_
)
:
IStorage
({
database_name_
,
table_name_
},
ColumnsDescription
({{
"_table"
,
std
::
make_shared
<
DataTypeString
>
()}},
true
))
,
source_database
(
source_database_
)
,
table_name_regexp
(
table_name_regexp_
)
,
global_context
(
context_
)
...
...
@@ -417,12 +415,13 @@ void StorageMerge::alter(
const
AlterCommands
&
params
,
const
Context
&
context
,
TableStructureWriteLockHolder
&
table_lock_holder
)
{
lockStructureExclusively
(
table_lock_holder
,
context
.
getCurrentQueryId
());
auto
table_id
=
getStorageID
();
auto
new_columns
=
getColumns
();
auto
new_indices
=
getIndices
();
auto
new_constraints
=
getConstraints
();
params
.
applyForColumnsOnly
(
new_columns
);
context
.
getDatabase
(
database_name
)
->
alterTable
(
context
,
table_name
,
new_columns
,
new_indices
,
new_constraints
,
{});
context
.
getDatabase
(
table_id
.
database_name
)
->
alterTable
(
context
,
table_id
.
table_name
,
new_columns
,
new_indices
,
new_constraints
,
{});
setColumns
(
new_columns
);
}
...
...
dbms/src/Storages/StorageMerge.h
浏览文件 @
cf5c998b
...
...
@@ -17,8 +17,6 @@ class StorageMerge : public ext::shared_ptr_helper<StorageMerge>, public IStorag
friend
struct
ext
::
shared_ptr_helper
<
StorageMerge
>
;
public:
std
::
string
getName
()
const
override
{
return
"Merge"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
isRemote
()
const
override
;
...
...
@@ -42,12 +40,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
/// you need to add and remove columns in the sub-tables manually
/// the structure of sub-tables is not checked
void
alter
(
...
...
@@ -56,8 +48,6 @@ public:
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
query_context
)
const
override
;
private:
String
table_name
;
String
database_name
;
String
source_database
;
OptimizedRegularExpression
table_name_regexp
;
Context
global_context
;
...
...
dbms/src/Storages/StorageMergeTree.cpp
浏览文件 @
cf5c998b
...
...
@@ -149,12 +149,14 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Con
void
StorageMergeTree
::
checkTableCanBeDropped
()
const
{
auto
table_id
=
getStorageID
();
const_cast
<
StorageMergeTree
&>
(
*
this
).
recalculateColumnSizes
();
global_context
.
checkTableCanBeDropped
(
database_name
,
table_name
,
getTotalActiveSizeInBytes
());
global_context
.
checkTableCanBeDropped
(
table_id
.
database_name
,
table_id
.
table_name
,
getTotalActiveSizeInBytes
());
}
void
StorageMergeTree
::
checkPartitionCanBeDropped
(
const
ASTPtr
&
partition
)
{
auto
table_id
=
getStorageID
();
const_cast
<
StorageMergeTree
&>
(
*
this
).
recalculateColumnSizes
();
const
String
partition_id
=
getPartitionIDFromQuery
(
partition
,
global_context
);
...
...
@@ -166,7 +168,7 @@ void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
{
partition_size
+=
part
->
bytes_on_disk
;
}
global_context
.
checkPartitionCanBeDropped
(
database_name
,
table_name
,
partition_size
);
global_context
.
checkPartitionCanBeDropped
(
table_id
.
database_name
,
table_id
.
table_name
,
partition_size
);
}
void
StorageMergeTree
::
drop
(
TableStructureWriteLockHolder
&
)
...
...
@@ -647,7 +649,9 @@ bool StorageMergeTree::merge(
merging_tagger
.
emplace
(
future_part
,
MergeTreeDataMergerMutator
::
estimateNeededDiskSpace
(
future_part
.
parts
),
*
this
,
false
);
}
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
database_name
,
table_name
,
future_part
);
auto
table_id
=
getStorageID
();
//FIXME
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
table_id
.
database_name
,
table_id
.
table_name
,
future_part
);
/// Logging
Stopwatch
stopwatch
;
...
...
@@ -774,7 +778,8 @@ bool StorageMergeTree::tryMutatePart()
if
(
!
tagger
)
return
false
;
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
database_name
,
table_name
,
future_part
);
auto
table_id
=
getStorageID
();
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
table_id
.
database_name
,
table_id
.
table_name
,
future_part
);
Stopwatch
stopwatch
;
MutableDataPartPtr
new_part
;
...
...
dbms/src/Storages/StorageMergeTree.h
浏览文件 @
cf5c998b
...
...
@@ -32,8 +32,6 @@ public:
~
StorageMergeTree
()
override
;
std
::
string
getName
()
const
override
{
return
merging_params
.
getModeName
()
+
"MergeTree"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
supportsIndexForIn
()
const
override
{
return
true
;
}
...
...
dbms/src/Storages/StorageMySQL.cpp
浏览文件 @
cf5c998b
...
...
@@ -47,8 +47,7 @@ StorageMySQL::StorageMySQL(
const
ColumnsDescription
&
columns_
,
const
ConstraintsDescription
&
constraints_
,
const
Context
&
context_
)
:
table_name
(
table_name_
)
,
database_name
(
database_name_
)
:
IStorage
({
database_name_
,
table_name_
})
,
remote_database_name
(
remote_database_name_
)
,
remote_table_name
(
remote_table_name_
)
,
replace_query
{
replace_query_
}
...
...
dbms/src/Storages/StorageMySQL.h
浏览文件 @
cf5c998b
...
...
@@ -33,8 +33,6 @@ public:
const
Context
&
context_
);
std
::
string
getName
()
const
override
{
return
"MySQL"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -48,8 +46,6 @@ public:
private:
friend
class
StorageMySQLBlockOutputStream
;
std
::
string
table_name
;
std
::
string
database_name
;
std
::
string
remote_database_name
;
std
::
string
remote_table_name
;
...
...
dbms/src/Storages/StorageNull.cpp
浏览文件 @
cf5c998b
...
...
@@ -34,15 +34,13 @@ void StorageNull::alter(
const
AlterCommands
&
params
,
const
Context
&
context
,
TableStructureWriteLockHolder
&
table_lock_holder
)
{
lockStructureExclusively
(
table_lock_holder
,
context
.
getCurrentQueryId
());
const
String
current_database_name
=
getDatabaseName
();
const
String
current_table_name
=
getTableName
();
auto
table_id
=
getStorageID
();
ColumnsDescription
new_columns
=
getColumns
();
IndicesDescription
new_indices
=
getIndices
();
ConstraintsDescription
new_constraints
=
getConstraints
();
params
.
applyForColumnsOnly
(
new_columns
);
context
.
getDatabase
(
current_database_name
)
->
alterTable
(
context
,
current_
table_name
,
new_columns
,
new_indices
,
new_constraints
,
{});
context
.
getDatabase
(
table_id
.
database_name
)
->
alterTable
(
context
,
table_id
.
table_name
,
new_columns
,
new_indices
,
new_constraints
,
{});
setColumns
(
std
::
move
(
new_columns
));
}
...
...
dbms/src/Storages/StorageNull.h
浏览文件 @
cf5c998b
...
...
@@ -19,8 +19,6 @@ class StorageNull : public ext::shared_ptr_helper<StorageNull>, public IStorage
friend
struct
ext
::
shared_ptr_helper
<
StorageNull
>
;
public:
std
::
string
getName
()
const
override
{
return
"Null"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -38,22 +36,14 @@ public:
return
std
::
make_shared
<
NullBlockOutputStream
>
(
getSampleBlock
());
}
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
void
alter
(
const
AlterCommands
&
params
,
const
Context
&
context
,
TableStructureWriteLockHolder
&
table_lock_holder
)
override
;
private:
String
table_name
;
String
database_name
;
protected:
StorageNull
(
String
database_name_
,
String
table_name_
,
ColumnsDescription
columns_description_
,
ConstraintsDescription
constraints_
)
:
table_name
(
std
::
move
(
table_name_
)),
database_name
(
std
::
move
(
database_name_
)
)
:
IStorage
({
database_name_
,
table_name_
}
)
{
setColumns
(
std
::
move
(
columns_description_
));
setConstraints
(
std
::
move
(
constraints_
));
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
cf5c998b
...
...
@@ -226,15 +226,15 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
zookeeper_path
=
"/"
+
zookeeper_path
;
replica_path
=
zookeeper_path
+
"/replicas/"
+
replica_name
;
queue_updating_task
=
global_context
.
getSchedulePool
().
createTask
(
database_name
+
"."
+
table_name
+
" (StorageReplicatedMergeTree::queueUpdatingTask)"
,
[
this
]{
queueUpdatingTask
();
});
queue_updating_task
=
global_context
.
getSchedulePool
().
createTask
(
getStorageID
().
getFullTableName
()
+
" (StorageReplicatedMergeTree::queueUpdatingTask)"
,
[
this
]{
queueUpdatingTask
();
});
mutations_updating_task
=
global_context
.
getSchedulePool
().
createTask
(
database_name
+
"."
+
table_name
+
" (StorageReplicatedMergeTree::mutationsUpdatingTask)"
,
[
this
]{
mutationsUpdatingTask
();
});
mutations_updating_task
=
global_context
.
getSchedulePool
().
createTask
(
getStorageID
().
getFullTableName
()
+
" (StorageReplicatedMergeTree::mutationsUpdatingTask)"
,
[
this
]{
mutationsUpdatingTask
();
});
merge_selecting_task
=
global_context
.
getSchedulePool
().
createTask
(
database_name
+
"."
+
table_name
+
" (StorageReplicatedMergeTree::mergeSelectingTask)"
,
[
this
]
{
mergeSelectingTask
();
});
merge_selecting_task
=
global_context
.
getSchedulePool
().
createTask
(
getStorageID
().
getFullTableName
()
+
" (StorageReplicatedMergeTree::mergeSelectingTask)"
,
[
this
]
{
mergeSelectingTask
();
});
/// Will be activated if we win leader election.
merge_selecting_task
->
deactivate
();
mutations_finalizing_task
=
global_context
.
getSchedulePool
().
createTask
(
database_name
+
"."
+
table_name
+
" (StorageReplicatedMergeTree::mutationsFinalizingTask)"
,
[
this
]
{
mutationsFinalizingTask
();
});
mutations_finalizing_task
=
global_context
.
getSchedulePool
().
createTask
(
getStorageID
().
getFullTableName
()
+
" (StorageReplicatedMergeTree::mutationsFinalizingTask)"
,
[
this
]
{
mutationsFinalizingTask
();
});
if
(
global_context
.
hasZooKeeper
())
current_zookeeper
=
global_context
.
getZooKeeper
();
...
...
@@ -557,7 +557,9 @@ void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_column
};
}
global_context
.
getDatabase
(
database_name
)
->
alterTable
(
global_context
,
table_name
,
new_columns
,
new_indices
,
new_constraints
,
storage_modifier
);
auto
table_id
=
getStorageID
();
//FIXME
global_context
.
getDatabase
(
table_id
.
database_name
)
->
alterTable
(
global_context
,
table_id
.
table_name
,
new_columns
,
new_indices
,
new_constraints
,
storage_modifier
);
/// Even if the primary/sorting keys didn't change we must reinitialize it
/// because primary key column types might have changed.
...
...
@@ -715,7 +717,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
if
(
insane
&&
!
skip_sanity_checks
)
{
std
::
stringstream
why
;
why
<<
"The local set of parts of table "
<<
database_name
<<
"."
<<
table_name
<<
" doesn't look like the set of parts "
why
<<
"The local set of parts of table "
<<
getStorageID
().
getNameForLogs
()
<<
" doesn't look like the set of parts "
<<
"in ZooKeeper: "
<<
formatReadableQuantity
(
unexpected_parts_rows
)
<<
" rows of "
<<
formatReadableQuantity
(
total_rows_on_filesystem
)
<<
" total rows in filesystem are suspicious."
;
...
...
@@ -1098,7 +1100,8 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
}
future_merged_part
.
updatePath
(
*
this
,
reserved_space
);
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
database_name
,
table_name
,
future_merged_part
);
auto
table_id
=
getStorageID
();
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
table_id
.
database_name
,
table_id
.
table_name
,
future_merged_part
);
Transaction
transaction
(
*
this
);
MutableDataPartPtr
part
;
...
...
@@ -1234,8 +1237,9 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part
.
name
=
entry
.
new_part_name
;
future_mutated_part
.
updatePath
(
*
this
,
reserved_space
);
auto
table_id
=
getStorageID
();
MergeList
::
EntryPtr
merge_entry
=
global_context
.
getMergeList
().
insert
(
database_name
,
table_name
,
future_mutated_part
);
table_id
.
database_name
,
table_id
.
table_name
,
future_mutated_part
);
Stopwatch
stopwatch
;
...
...
@@ -2946,7 +2950,7 @@ void StorageReplicatedMergeTree::startup()
queue
.
initialize
(
zookeeper_path
,
replica_path
,
database_name
+
"."
+
table_name
+
" (ReplicatedMergeTreeQueue)"
,
getStorageID
().
getFullTableName
()
+
" (ReplicatedMergeTreeQueue)"
,
getDataParts
());
StoragePtr
ptr
=
shared_from_this
();
...
...
@@ -3794,7 +3798,8 @@ void StorageReplicatedMergeTree::checkTableCanBeDropped() const
{
/// Consider only synchronized data
const_cast
<
StorageReplicatedMergeTree
&>
(
*
this
).
recalculateColumnSizes
();
global_context
.
checkTableCanBeDropped
(
database_name
,
table_name
,
getTotalActiveSizeInBytes
());
auto
table_id
=
getStorageID
();
global_context
.
checkTableCanBeDropped
(
table_id
.
database_name
,
table_id
.
table_name
,
getTotalActiveSizeInBytes
());
}
...
...
@@ -3810,7 +3815,8 @@ void StorageReplicatedMergeTree::checkPartitionCanBeDropped(const ASTPtr & parti
for
(
const
auto
&
part
:
parts_to_remove
)
partition_size
+=
part
->
bytes_on_disk
;
global_context
.
checkPartitionCanBeDropped
(
database_name
,
table_name
,
partition_size
);
auto
table_id
=
getStorageID
();
global_context
.
checkPartitionCanBeDropped
(
table_id
.
database_name
,
table_id
.
table_name
,
partition_size
);
}
...
...
@@ -5018,6 +5024,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
ReplicatedMergeTreeLogEntryData
entry
;
{
auto
src_table_id
=
src_data
.
getStorageID
();
entry
.
type
=
ReplicatedMergeTreeLogEntryData
::
REPLACE_RANGE
;
entry
.
source_replica
=
replica_name
;
entry
.
create_time
=
time
(
nullptr
);
...
...
@@ -5025,8 +5032,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
auto
&
entry_replace
=
*
entry
.
replace_range_entry
;
entry_replace
.
drop_range_part_name
=
drop_range_fake_part_name
;
entry_replace
.
from_database
=
src_
data
.
database_name
;
entry_replace
.
from_table
=
src_
data
.
table_name
;
entry_replace
.
from_database
=
src_
table_id
.
database_name
;
entry_replace
.
from_table
=
src_
table_id
.
table_name
;
for
(
const
auto
&
part
:
src_parts
)
entry_replace
.
src_part_names
.
emplace_back
(
part
->
name
);
for
(
const
auto
&
part
:
dst_parts
)
...
...
@@ -5216,13 +5223,14 @@ void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit(
ReplicatedMergeTreeAddress
StorageReplicatedMergeTree
::
getReplicatedMergeTreeAddress
()
const
{
auto
host_port
=
global_context
.
getInterserverIOAddress
();
auto
table_id
=
getStorageID
();
ReplicatedMergeTreeAddress
res
;
res
.
host
=
host_port
.
first
;
res
.
replication_port
=
host_port
.
second
;
res
.
queries_port
=
global_context
.
getTCPPort
();
res
.
database
=
database_name
;
res
.
table
=
table_name
;
res
.
database
=
table_id
.
database_name
;
res
.
table
=
table_
id
.
table_
name
;
res
.
scheme
=
global_context
.
getInterserverScheme
();
return
res
;
}
...
...
dbms/src/Storages/StorageReplicatedMergeTree.h
浏览文件 @
cf5c998b
...
...
@@ -83,8 +83,6 @@ public:
~
StorageReplicatedMergeTree
()
override
;
std
::
string
getName
()
const
override
{
return
"Replicated"
+
merging_params
.
getModeName
()
+
"MergeTree"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
supportsReplication
()
const
override
{
return
true
;
}
bool
supportsDeduplication
()
const
override
{
return
true
;
}
...
...
dbms/src/Storages/StorageS3.cpp
浏览文件 @
cf5c998b
...
...
@@ -143,12 +143,10 @@ StorageS3::StorageS3(const S3::URI & uri_,
const
ConstraintsDescription
&
constraints_
,
Context
&
context_
,
const
String
&
compression_method_
=
""
)
:
IStorage
(
columns_
)
:
IStorage
(
{
database_name_
,
table_name_
},
columns_
)
,
uri
(
uri_
)
,
context_global
(
context_
)
,
format_name
(
format_name_
)
,
database_name
(
database_name_
)
,
table_name
(
table_name_
)
,
min_upload_part_size
(
min_upload_part_size_
)
,
compression_method
(
compression_method_
)
,
client
(
S3
::
ClientFactory
::
instance
().
create
(
uri_
.
endpoint
,
access_key_id_
,
secret_access_key_
))
...
...
@@ -184,12 +182,6 @@ BlockInputStreams StorageS3::read(
return
{
std
::
make_shared
<
AddingDefaultsBlockInputStream
>
(
block_input
,
column_defaults
,
context
)};
}
void
StorageS3
::
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
BlockOutputStreamPtr
StorageS3
::
write
(
const
ASTPtr
&
/*query*/
,
const
Context
&
/*context*/
)
{
return
std
::
make_shared
<
StorageS3BlockOutputStream
>
(
...
...
dbms/src/Storages/StorageS3.h
浏览文件 @
cf5c998b
...
...
@@ -47,11 +47,6 @@ public:
return
getSampleBlock
();
}
String
getTableName
()
const
override
{
return
table_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
query_info
,
...
...
@@ -62,15 +57,11 @@ public:
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Context
&
context
)
override
;
void
rename
(
const
String
&
new_path_to_db
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
;
private:
S3
::
URI
uri
;
const
Context
&
context_global
;
String
format_name
;
String
database_name
;
String
table_name
;
UInt64
min_upload_part_size
;
String
compression_method
;
std
::
shared_ptr
<
Aws
::
S3
::
S3Client
>
client
;
...
...
dbms/src/Storages/StorageSet.cpp
浏览文件 @
cf5c998b
...
...
@@ -94,7 +94,7 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const
ColumnsDescription
&
columns_
,
const
ConstraintsDescription
&
constraints_
,
const
Context
&
context_
)
:
table_name
(
table_name_
),
database_name
(
database_name_
)
:
IStorage
({
database_name_
,
table_name_
}
)
{
setColumns
(
columns_
);
setConstraints
(
constraints_
);
...
...
@@ -209,8 +209,7 @@ void StorageSetOrJoinBase::rename(
Poco
::
File
(
path
).
renameTo
(
new_path
);
path
=
new_path
;
table_name
=
new_table_name
;
database_name
=
new_database_name
;
renameInMemory
(
new_database_name
,
new_table_name
);
}
...
...
dbms/src/Storages/StorageSet.h
浏览文件 @
cf5c998b
...
...
@@ -19,9 +19,6 @@ class StorageSetOrJoinBase : public IStorage
friend
class
SetOrJoinBlockOutputStream
;
public:
String
getTableName
()
const
override
{
return
table_name
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
void
rename
(
const
String
&
new_path_to_table_data
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
;
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Context
&
context
)
override
;
...
...
@@ -39,8 +36,6 @@ protected:
String
base_path
;
String
path
;
String
table_name
;
String
database_name
;
std
::
atomic
<
UInt64
>
increment
=
0
;
/// For the backup file names.
...
...
dbms/src/Storages/StorageStripeLog.cpp
浏览文件 @
cf5c998b
...
...
@@ -202,7 +202,8 @@ StorageStripeLog::StorageStripeLog(
bool
attach
,
size_t
max_compress_block_size_
,
const
Context
&
context_
)
:
base_path
(
context_
.
getPath
()),
path
(
base_path
+
relative_path_
),
table_name
(
table_name_
),
database_name
(
database_name_
),
:
IStorage
({
database_name_
,
table_name_
}),
base_path
(
context_
.
getPath
()),
path
(
base_path
+
relative_path_
),
max_compress_block_size
(
max_compress_block_size_
),
file_checker
(
path
+
"sizes.json"
),
log
(
&
Logger
::
get
(
"StorageStripeLog"
))
...
...
@@ -227,9 +228,8 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Strin
Poco
::
File
(
path
).
renameTo
(
new_path
);
path
=
new_path
;
table_name
=
new_table_name
;
database_name
=
new_database_name
;
file_checker
.
setPath
(
path
+
"sizes.json"
);
renameInMemory
(
new_database_name
,
new_table_name
);
}
...
...
@@ -292,7 +292,7 @@ CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Conte
void
StorageStripeLog
::
truncate
(
const
ASTPtr
&
,
const
Context
&
,
TableStructureWriteLockHolder
&
)
{
if
(
table_name
.
empty
())
if
(
getStorageID
().
table_name
.
empty
())
//FIXME how can it be empty?
throw
Exception
(
"Logical error: table name is empty"
,
ErrorCodes
::
LOGICAL_ERROR
);
std
::
shared_lock
<
std
::
shared_mutex
>
lock
(
rwlock
);
...
...
dbms/src/Storages/StorageStripeLog.h
浏览文件 @
cf5c998b
...
...
@@ -27,8 +27,6 @@ friend struct ext::shared_ptr_helper<StorageStripeLog>;
public:
std
::
string
getName
()
const
override
{
return
"StripeLog"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -60,8 +58,6 @@ public:
private:
String
base_path
;
String
path
;
String
table_name
;
String
database_name
;
size_t
max_compress_block_size
;
...
...
dbms/src/Storages/StorageTinyLog.cpp
浏览文件 @
cf5c998b
...
...
@@ -331,7 +331,7 @@ StorageTinyLog::StorageTinyLog(
bool
attach
,
size_t
max_compress_block_size_
,
const
Context
&
context_
)
:
base_path
(
context_
.
getPath
()),
path
(
base_path
+
relative_path_
),
table_name
(
table_name_
),
database_name
(
database_name
_
),
:
IStorage
({
database_name_
,
table_name_
}),
base_path
(
context_
.
getPath
()),
path
(
base_path
+
relative_path
_
),
max_compress_block_size
(
max_compress_block_size_
),
file_checker
(
path
+
"sizes.json"
),
log
(
&
Logger
::
get
(
"StorageTinyLog"
))
...
...
@@ -382,12 +382,11 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const String
Poco
::
File
(
path
).
renameTo
(
new_path
);
path
=
new_path
;
table_name
=
new_table_name
;
database_name
=
new_database_name
;
file_checker
.
setPath
(
path
+
"sizes.json"
);
for
(
Files_t
::
iterator
it
=
files
.
begin
();
it
!=
files
.
end
();
++
it
)
it
->
second
.
data_file
=
Poco
::
File
(
path
+
Poco
::
Path
(
it
->
second
.
data_file
.
path
()).
getFileName
());
renameInMemory
(
new_database_name
,
new_table_name
);
}
...
...
@@ -422,7 +421,7 @@ CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context
void
StorageTinyLog
::
truncate
(
const
ASTPtr
&
,
const
Context
&
,
TableStructureWriteLockHolder
&
)
{
if
(
table_name
.
empty
())
if
(
getStorageID
().
table_name
.
empty
())
//FIXME how can it be empty?
throw
Exception
(
"Logical error: table name is empty"
,
ErrorCodes
::
LOGICAL_ERROR
);
std
::
unique_lock
<
std
::
shared_mutex
>
lock
(
rwlock
);
...
...
dbms/src/Storages/StorageTinyLog.h
浏览文件 @
cf5c998b
...
...
@@ -26,8 +26,6 @@ friend struct ext::shared_ptr_helper<StorageTinyLog>;
public:
std
::
string
getName
()
const
override
{
return
"TinyLog"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -59,8 +57,6 @@ public:
private:
String
base_path
;
String
path
;
String
table_name
;
String
database_name
;
size_t
max_compress_block_size
;
...
...
@@ -71,7 +67,6 @@ private:
Logger
*
log
;
void
addFile
(
const
String
&
column_name
,
const
IDataType
&
type
,
size_t
level
=
0
);
void
addFiles
(
const
String
&
column_name
,
const
IDataType
&
type
);
protected:
...
...
dbms/src/Storages/StorageURL.cpp
浏览文件 @
cf5c998b
...
...
@@ -36,7 +36,7 @@ IStorageURLBase::IStorageURLBase(
const
ColumnsDescription
&
columns_
,
const
ConstraintsDescription
&
constraints_
,
const
String
&
compression_method_
)
:
uri
(
uri_
),
context_global
(
context_
),
compression_method
(
compression_method_
),
format_name
(
format_name_
),
table_name
(
table_name_
),
database_name
(
database
_name_
)
:
IStorage
({
database_name_
,
table_name_
}),
uri
(
uri_
),
context_global
(
context_
),
compression_method
(
compression_method_
),
format_name
(
format
_name_
)
{
context_global
.
getRemoteHostFilter
().
checkURL
(
uri
);
setColumns
(
columns_
);
...
...
@@ -204,12 +204,6 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
return
{
std
::
make_shared
<
AddingDefaultsBlockInputStream
>
(
block_input
,
column_defaults
,
context
)};
}
void
IStorageURLBase
::
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
BlockOutputStreamPtr
IStorageURLBase
::
write
(
const
ASTPtr
&
/*query*/
,
const
Context
&
/*context*/
)
{
return
std
::
make_shared
<
StorageURLBlockOutputStream
>
(
...
...
dbms/src/Storages/StorageURL.h
浏览文件 @
cf5c998b
...
...
@@ -16,9 +16,6 @@ namespace DB
class
IStorageURLBase
:
public
IStorage
{
public:
String
getTableName
()
const
override
{
return
table_name
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
query_info
,
...
...
@@ -29,8 +26,6 @@ public:
BlockOutputStreamPtr
write
(
const
ASTPtr
&
query
,
const
Context
&
context
)
override
;
void
rename
(
const
String
&
new_path_to_db
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
;
protected:
IStorageURLBase
(
const
Poco
::
URI
&
uri_
,
...
...
@@ -48,8 +43,6 @@ protected:
private:
String
format_name
;
String
table_name
;
String
database_name
;
virtual
std
::
string
getReadMethod
()
const
;
...
...
dbms/src/Storages/StorageValues.cpp
浏览文件 @
cf5c998b
...
...
@@ -8,7 +8,7 @@ namespace DB
{
StorageValues
::
StorageValues
(
const
std
::
string
&
database_name_
,
const
std
::
string
&
table_name_
,
const
ColumnsDescription
&
columns_
,
const
Block
&
res_block_
)
:
database_name
(
database_name_
),
table_name
(
table_name_
),
res_block
(
res_block_
)
:
IStorage
({
database_name_
,
table_name_
}
),
res_block
(
res_block_
)
{
setColumns
(
columns_
);
}
...
...
dbms/src/Storages/StorageValues.h
浏览文件 @
cf5c998b
...
...
@@ -14,8 +14,6 @@ class StorageValues : public ext::shared_ptr_helper<StorageValues>, public IStor
friend
struct
ext
::
shared_ptr_helper
<
StorageValues
>
;
public:
std
::
string
getName
()
const
override
{
return
"Values"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -26,8 +24,6 @@ public:
unsigned
num_streams
)
override
;
private:
std
::
string
database_name
;
std
::
string
table_name
;
Block
res_block
;
protected:
...
...
dbms/src/Storages/StorageView.cpp
浏览文件 @
cf5c998b
...
...
@@ -31,7 +31,7 @@ StorageView::StorageView(
const
String
&
table_name_
,
const
ASTCreateQuery
&
query
,
const
ColumnsDescription
&
columns_
)
:
table_name
(
table_name_
),
database_name
(
database_name_
)
:
IStorage
({
database_name_
,
table_name_
}
)
{
setColumns
(
columns_
);
...
...
dbms/src/Storages/StorageView.h
浏览文件 @
cf5c998b
...
...
@@ -15,8 +15,6 @@ class StorageView : public ext::shared_ptr_helper<StorageView>, public IStorage
friend
struct
ext
::
shared_ptr_helper
<
StorageView
>
;
public:
std
::
string
getName
()
const
override
{
return
"View"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
/// It is passed inside the query and solved at its level.
bool
supportsSampling
()
const
override
{
return
true
;
}
...
...
@@ -30,15 +28,7 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
void
rename
(
const
String
&
/*new_path_to_db*/
,
const
String
&
new_database_name
,
const
String
&
new_table_name
,
TableStructureWriteLockHolder
&
)
override
{
table_name
=
new_table_name
;
database_name
=
new_database_name
;
}
private:
String
table_name
;
String
database_name
;
ASTPtr
inner_query
;
void
replaceTableNameWithSubquery
(
ASTSelectQuery
*
select_query
,
ASTPtr
&
subquery
);
...
...
dbms/src/Storages/System/IStorageSystemOneBlock.h
浏览文件 @
cf5c998b
...
...
@@ -20,14 +20,11 @@ protected:
virtual
void
fillData
(
MutableColumns
&
res_columns
,
const
Context
&
context
,
const
SelectQueryInfo
&
query_info
)
const
=
0
;
public:
IStorageSystemOneBlock
(
const
String
&
name_
)
:
name
(
name_
)
IStorageSystemOneBlock
(
const
String
&
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
(
Self
::
getNamesAndTypes
()));
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
query_info
,
const
Context
&
context
,
...
...
@@ -43,9 +40,6 @@ public:
return
BlockInputStreams
(
1
,
std
::
make_shared
<
OneBlockInputStream
>
(
sample_block
.
cloneWithColumns
(
std
::
move
(
res_columns
))));
}
private:
const
String
name
;
};
}
dbms/src/Storages/System/StorageSystemColumns.cpp
浏览文件 @
cf5c998b
...
...
@@ -22,7 +22,7 @@ namespace ErrorCodes
}
StorageSystemColumns
::
StorageSystemColumns
(
const
std
::
string
&
name_
)
:
name
(
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
(
{
...
...
dbms/src/Storages/System/StorageSystemColumns.h
浏览文件 @
cf5c998b
...
...
@@ -16,8 +16,6 @@ class StorageSystemColumns : public ext::shared_ptr_helper<StorageSystemColumns>
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemColumns
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemColumns"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -27,9 +25,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
protected:
StorageSystemColumns
(
const
std
::
string
&
name_
);
};
...
...
dbms/src/Storages/System/StorageSystemDetachedParts.cpp
浏览文件 @
cf5c998b
...
...
@@ -24,11 +24,10 @@ class StorageSystemDetachedParts :
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemDetachedParts
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemDetachedParts"
;
}
std
::
string
getTableName
()
const
override
{
return
"detached_parts"
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
protected:
explicit
StorageSystemDetachedParts
()
:
IStorage
({
"system"
,
"detached_parts"
})
{
setColumns
(
ColumnsDescription
{{
{
"database"
,
std
::
make_shared
<
DataTypeString
>
()},
...
...
dbms/src/Storages/System/StorageSystemDisks.cpp
浏览文件 @
cf5c998b
...
...
@@ -10,7 +10,7 @@ namespace ErrorCodes
StorageSystemDisks
::
StorageSystemDisks
(
const
std
::
string
&
name_
)
:
name
(
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
(
{
...
...
dbms/src/Storages/System/StorageSystemDisks.h
浏览文件 @
cf5c998b
...
...
@@ -19,8 +19,6 @@ class StorageSystemDisks : public ext::shared_ptr_helper<StorageSystemDisks>, pu
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemDisks
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemDisks"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -30,9 +28,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
protected:
StorageSystemDisks
(
const
std
::
string
&
name_
);
};
...
...
dbms/src/Storages/System/StorageSystemNumbers.cpp
浏览文件 @
cf5c998b
...
...
@@ -104,7 +104,7 @@ private:
StorageSystemNumbers
::
StorageSystemNumbers
(
const
std
::
string
&
name_
,
bool
multithreaded_
,
std
::
optional
<
UInt64
>
limit_
,
UInt64
offset_
,
bool
even_distribution_
)
:
name
(
name_
),
multithreaded
(
multithreaded_
),
even_distribution
(
even_distribution_
),
limit
(
limit_
),
offset
(
offset_
)
:
IStorage
({
"system"
,
name_
}
),
multithreaded
(
multithreaded_
),
even_distribution
(
even_distribution_
),
limit
(
limit_
),
offset
(
offset_
)
{
setColumns
(
ColumnsDescription
({{
"number"
,
std
::
make_shared
<
DataTypeUInt64
>
()}}));
}
...
...
dbms/src/Storages/System/StorageSystemNumbers.h
浏览文件 @
cf5c998b
...
...
@@ -28,8 +28,6 @@ class StorageSystemNumbers : public ext::shared_ptr_helper<StorageSystemNumbers>
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemNumbers
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemNumbers"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -40,7 +38,6 @@ public:
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
bool
multithreaded
;
bool
even_distribution
;
std
::
optional
<
UInt64
>
limit
;
...
...
dbms/src/Storages/System/StorageSystemOne.cpp
浏览文件 @
cf5c998b
...
...
@@ -11,7 +11,7 @@ namespace DB
StorageSystemOne
::
StorageSystemOne
(
const
std
::
string
&
name_
)
:
name
(
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
({{
"dummy"
,
std
::
make_shared
<
DataTypeUInt8
>
()}}));
}
...
...
dbms/src/Storages/System/StorageSystemOne.h
浏览文件 @
cf5c998b
...
...
@@ -20,8 +20,6 @@ class StorageSystemOne : public ext::shared_ptr_helper<StorageSystemOne>, public
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemOne
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemOne"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -31,9 +29,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
protected:
StorageSystemOne
(
const
std
::
string
&
name_
);
};
...
...
dbms/src/Storages/System/StorageSystemPartsBase.cpp
浏览文件 @
cf5c998b
...
...
@@ -257,7 +257,7 @@ bool StorageSystemPartsBase::hasColumn(const String & column_name) const
}
StorageSystemPartsBase
::
StorageSystemPartsBase
(
std
::
string
name_
,
NamesAndTypesList
&&
columns_
)
:
name
(
std
::
move
(
name_
)
)
:
IStorage
({
"system"
,
name_
}
)
{
ColumnsDescription
tmp_columns
(
std
::
move
(
columns_
));
...
...
dbms/src/Storages/System/StorageSystemPartsBase.h
浏览文件 @
cf5c998b
...
...
@@ -53,9 +53,6 @@ private:
class
StorageSystemPartsBase
:
public
IStorage
{
public:
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
NameAndTypePair
getColumn
(
const
String
&
column_name
)
const
override
;
bool
hasColumn
(
const
String
&
column_name
)
const
override
;
...
...
@@ -69,8 +66,6 @@ public:
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
bool
hasStateColumn
(
const
Names
&
column_names
)
const
;
protected:
...
...
dbms/src/Storages/System/StorageSystemReplicas.cpp
浏览文件 @
cf5c998b
...
...
@@ -15,7 +15,7 @@ namespace DB
StorageSystemReplicas
::
StorageSystemReplicas
(
const
std
::
string
&
name_
)
:
name
(
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
({
{
"database"
,
std
::
make_shared
<
DataTypeString
>
()
},
...
...
dbms/src/Storages/System/StorageSystemReplicas.h
浏览文件 @
cf5c998b
...
...
@@ -17,8 +17,6 @@ class StorageSystemReplicas : public ext::shared_ptr_helper<StorageSystemReplica
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemReplicas
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemReplicas"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -28,9 +26,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
protected:
StorageSystemReplicas
(
const
std
::
string
&
name_
);
};
...
...
dbms/src/Storages/System/StorageSystemStoragePolicies.cpp
浏览文件 @
cf5c998b
...
...
@@ -13,7 +13,7 @@ namespace ErrorCodes
StorageSystemStoragePolicies
::
StorageSystemStoragePolicies
(
const
std
::
string
&
name_
)
:
name
(
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
({
...
...
dbms/src/Storages/System/StorageSystemStoragePolicies.h
浏览文件 @
cf5c998b
...
...
@@ -19,8 +19,6 @@ class StorageSystemStoragePolicies : public ext::shared_ptr_helper<StorageSystem
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemStoragePolicies
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemStoragePolicies"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -30,9 +28,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
protected:
StorageSystemStoragePolicies
(
const
std
::
string
&
name_
);
};
...
...
dbms/src/Storages/System/StorageSystemTables.cpp
浏览文件 @
cf5c998b
...
...
@@ -26,7 +26,7 @@ namespace ErrorCodes
StorageSystemTables
::
StorageSystemTables
(
const
std
::
string
&
name_
)
:
name
(
name_
)
:
IStorage
({
"system"
,
name_
}
)
{
setColumns
(
ColumnsDescription
(
{
...
...
dbms/src/Storages/System/StorageSystemTables.h
浏览文件 @
cf5c998b
...
...
@@ -17,8 +17,6 @@ class StorageSystemTables : public ext::shared_ptr_helper<StorageSystemTables>,
friend
struct
ext
::
shared_ptr_helper
<
StorageSystemTables
>
;
public:
std
::
string
getName
()
const
override
{
return
"SystemTables"
;
}
std
::
string
getTableName
()
const
override
{
return
name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
"system"
;
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -28,9 +26,6 @@ public:
size_t
max_block_size
,
unsigned
num_streams
)
override
;
private:
const
std
::
string
name
;
protected:
StorageSystemTables
(
const
std
::
string
&
name_
);
};
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录