Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
0984363b
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0984363b
编写于
5月 28, 2015
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dbms: removed passing DataTypeFactory everywhere [#METR-16545].
上级
ef54e74f
变更
41
隐藏空白更改
内联
并排
Showing
41 changed file
with
97 addition
and
108 deletion
+97
-108
dbms/include/DB/Client/Connection.h
dbms/include/DB/Client/Connection.h
+1
-6
dbms/include/DB/Client/ConnectionPool.h
dbms/include/DB/Client/ConnectionPool.h
+2
-5
dbms/include/DB/Common/ExternalTable.h
dbms/include/DB/Common/ExternalTable.h
+4
-2
dbms/include/DB/Core/NamesAndTypes.h
dbms/include/DB/Core/NamesAndTypes.h
+2
-3
dbms/include/DB/DataStreams/FormatFactory.h
dbms/include/DB/DataStreams/FormatFactory.h
+2
-4
dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h
dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h
+4
-5
dbms/include/DB/DataStreams/NativeBlockInputStream.h
dbms/include/DB/DataStreams/NativeBlockInputStream.h
+2
-4
dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h
dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h
+2
-2
dbms/include/DB/Dictionaries/FileDictionarySource.h
dbms/include/DB/Dictionaries/FileDictionarySource.h
+1
-1
dbms/include/DB/Interpreters/Cluster.h
dbms/include/DB/Interpreters/Cluster.h
+3
-5
dbms/include/DB/Interpreters/Context.h
dbms/include/DB/Interpreters/Context.h
+0
-1
dbms/include/DB/Interpreters/InterpreterAlterQuery.h
dbms/include/DB/Interpreters/InterpreterAlterQuery.h
+1
-1
dbms/include/DB/Storages/ColumnsDescription.h
dbms/include/DB/Storages/ColumnsDescription.h
+1
-1
dbms/include/DB/Storages/Distributed/DirectoryMonitor.h
dbms/include/DB/Storages/Distributed/DirectoryMonitor.h
+1
-1
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
+1
-1
dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h
dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h
+1
-2
dbms/include/DB/Storages/StorageSet.h
dbms/include/DB/Storages/StorageSet.h
+1
-1
dbms/include/DB/TableFunctions/TableFunctionRemote.h
dbms/include/DB/TableFunctions/TableFunctionRemote.h
+5
-2
dbms/src/Client/Benchmark.cpp
dbms/src/Client/Benchmark.cpp
+1
-2
dbms/src/Client/Client.cpp
dbms/src/Client/Client.cpp
+2
-2
dbms/src/Client/Connection.cpp
dbms/src/Client/Connection.cpp
+7
-7
dbms/src/Core/NamesAndTypes.cpp
dbms/src/Core/NamesAndTypes.cpp
+7
-3
dbms/src/DataStreams/FormatFactory.cpp
dbms/src/DataStreams/FormatFactory.cpp
+2
-2
dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
+1
-1
dbms/src/DataStreams/NativeBlockInputStream.cpp
dbms/src/DataStreams/NativeBlockInputStream.cpp
+3
-0
dbms/src/DataStreams/tests/native_streams.cpp
dbms/src/DataStreams/tests/native_streams.cpp
+0
-3
dbms/src/DataStreams/tests/sorting_stream.cpp
dbms/src/DataStreams/tests/sorting_stream.cpp
+1
-2
dbms/src/Interpreters/Cluster.cpp
dbms/src/Interpreters/Cluster.cpp
+7
-7
dbms/src/Interpreters/Context.cpp
dbms/src/Interpreters/Context.cpp
+1
-4
dbms/src/Interpreters/InterpreterAlterQuery.cpp
dbms/src/Interpreters/InterpreterAlterQuery.cpp
+5
-2
dbms/src/Interpreters/InterpreterCreateQuery.cpp
dbms/src/Interpreters/InterpreterCreateQuery.cpp
+5
-1
dbms/src/Interpreters/InterpreterInsertQuery.cpp
dbms/src/Interpreters/InterpreterInsertQuery.cpp
+1
-2
dbms/src/Interpreters/InterpreterSelectQuery.cpp
dbms/src/Interpreters/InterpreterSelectQuery.cpp
+1
-1
dbms/src/Interpreters/tests/expression_analyzer.cpp
dbms/src/Interpreters/tests/expression_analyzer.cpp
+2
-1
dbms/src/Server/TCPHandler.cpp
dbms/src/Server/TCPHandler.cpp
+0
-1
dbms/src/Storages/ColumnsDescription.cpp
dbms/src/Storages/ColumnsDescription.cpp
+3
-1
dbms/src/Storages/MergeTree/MergeTreeData.cpp
dbms/src/Storages/MergeTree/MergeTreeData.cpp
+1
-1
dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp
dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp
+5
-7
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+4
-5
dbms/src/Storages/StorageSet.cpp
dbms/src/Storages/StorageSet.cpp
+3
-5
dbms/src/Storages/tests/part_checker.cpp
dbms/src/Storages/tests/part_checker.cpp
+1
-1
未找到文件。
dbms/include/DB/Client/Connection.h
浏览文件 @
0984363b
...
...
@@ -12,8 +12,6 @@
#include <DB/Core/Protocol.h>
#include <DB/Core/QueryProcessingStage.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
...
...
@@ -50,7 +48,6 @@ class Connection : private boost::noncopyable
public:
Connection
(
const
String
&
host_
,
UInt16
port_
,
const
String
&
default_database_
,
const
String
&
user_
,
const
String
&
password_
,
const
DataTypeFactory
&
data_type_factory_
,
const
String
&
client_name_
=
"client"
,
Protocol
::
Compression
::
Enum
compression_
=
Protocol
::
Compression
::
Enable
,
Poco
::
Timespan
connect_timeout_
=
Poco
::
Timespan
(
DBMS_DEFAULT_CONNECT_TIMEOUT_SEC
,
0
),
...
...
@@ -61,7 +58,7 @@ public:
host
(
host_
),
port
(
port_
),
default_database
(
default_database_
),
user
(
user_
),
password
(
password_
),
client_name
(
client_name_
),
compression
(
compression_
),
data_type_factory
(
data_type_factory_
),
compression
(
compression_
),
connect_timeout
(
connect_timeout_
),
receive_timeout
(
receive_timeout_
),
send_timeout
(
send_timeout_
),
ping_timeout
(
ping_timeout_
),
log_wrapper
(
host
,
port
)
...
...
@@ -172,8 +169,6 @@ private:
/// каким алгоритмом сжимать данные при INSERT и данные внешних таблиц
CompressionMethod
network_compression_method
=
CompressionMethod
::
LZ4
;
const
DataTypeFactory
&
data_type_factory
;
/** Если не nullptr, то используется, чтобы ограничить сетевой трафик.
* Учитывается только трафик при передаче блоков. Другие пакеты не учитываются.
*/
...
...
dbms/include/DB/Client/ConnectionPool.h
浏览文件 @
0984363b
...
...
@@ -56,7 +56,6 @@ public:
ConnectionPool
(
unsigned
max_connections_
,
const
String
&
host_
,
UInt16
port_
,
const
String
&
default_database_
,
const
String
&
user_
,
const
String
&
password_
,
const
DataTypeFactory
&
data_type_factory_
,
const
String
&
client_name_
=
"client"
,
Protocol
::
Compression
::
Enum
compression_
=
Protocol
::
Compression
::
Enable
,
Poco
::
Timespan
connect_timeout_
=
Poco
::
Timespan
(
DBMS_DEFAULT_CONNECT_TIMEOUT_SEC
,
0
),
...
...
@@ -65,7 +64,7 @@ public:
:
Base
(
max_connections_
,
&
Logger
::
get
(
"ConnectionPool ("
+
Poco
::
Net
::
SocketAddress
(
host_
,
port_
).
toString
()
+
")"
)),
host
(
host_
),
port
(
port_
),
default_database
(
default_database_
),
user
(
user_
),
password
(
password_
),
client_name
(
client_name_
),
compression
(
compression_
),
data_type_factory
(
data_type_factory_
),
client_name
(
client_name_
),
compression
(
compression_
),
connect_timeout
(
connect_timeout_
),
receive_timeout
(
receive_timeout_
),
send_timeout
(
send_timeout_
)
{
}
...
...
@@ -91,7 +90,7 @@ protected:
{
return
new
Connection
(
host
,
port
,
default_database
,
user
,
password
,
data_type_factory
,
client_name
,
compression
,
client_name
,
compression
,
connect_timeout
,
receive_timeout
,
send_timeout
);
}
...
...
@@ -105,8 +104,6 @@ private:
String
client_name
;
Protocol
::
Compression
::
Enum
compression
;
/// Сжимать ли данные при взаимодействии с сервером.
const
DataTypeFactory
&
data_type_factory
;
Poco
::
Timespan
connect_timeout
;
Poco
::
Timespan
receive_timeout
;
Poco
::
Timespan
send_timeout
;
...
...
dbms/include/DB/Common/ExternalTable.h
浏览文件 @
0984363b
...
...
@@ -42,11 +42,13 @@ public:
/// Инициализировать sample_block по структуре таблицы сохраненной в structure
virtual
void
initSampleBlock
(
const
Context
&
context
)
{
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
for
(
size_t
i
=
0
;
i
<
structure
.
size
();
++
i
)
{
ColumnWithNameAndType
column
;
column
.
name
=
structure
[
i
].
first
;
column
.
type
=
context
.
getDataTypeFactory
()
.
get
(
structure
[
i
].
second
);
column
.
type
=
data_type_factory
.
get
(
structure
[
i
].
second
);
column
.
column
=
column
.
type
->
createColumn
();
sample_block
.
insert
(
column
);
}
...
...
@@ -58,7 +60,7 @@ public:
initReadBuffer
();
initSampleBlock
(
context
);
ExternalTableData
res
=
std
::
make_pair
(
new
AsynchronousBlockInputStream
(
context
.
getFormatFactory
().
getInput
(
format
,
*
read_buffer
,
sample_block
,
DEFAULT_BLOCK_SIZE
,
context
.
getDataTypeFactory
()
)),
name
);
format
,
*
read_buffer
,
sample_block
,
DEFAULT_BLOCK_SIZE
)),
name
);
return
res
;
}
...
...
dbms/include/DB/Core/NamesAndTypes.h
浏览文件 @
0984363b
...
...
@@ -9,7 +9,6 @@
#include <sparsehash/dense_hash_map>
#include <DB/DataTypes/IDataType.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/IO/ReadBufferFromString.h>
#include "Names.h"
...
...
@@ -45,11 +44,11 @@ class NamesAndTypesList : public std::list<NameAndTypePair>
public:
using
std
::
list
<
NameAndTypePair
>::
list
;
void
readText
(
ReadBuffer
&
buf
,
const
DataTypeFactory
&
data_type_factory
);
void
readText
(
ReadBuffer
&
buf
);
void
writeText
(
WriteBuffer
&
buf
)
const
;
String
toString
()
const
;
static
NamesAndTypesList
parse
(
const
String
&
s
,
const
DataTypeFactory
&
data_type_factory
);
static
NamesAndTypesList
parse
(
const
String
&
s
);
/// Все элементы rhs должны быть различны.
bool
isSubsetOf
(
const
NamesAndTypesList
&
rhs
)
const
;
...
...
dbms/include/DB/DataStreams/FormatFactory.h
浏览文件 @
0984363b
#pragma once
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
...
...
@@ -16,8 +14,8 @@ class FormatFactory
{
public:
BlockInputStreamPtr
getInput
(
const
String
&
name
,
ReadBuffer
&
buf
,
Block
&
sample
,
size_t
max_block_size
,
const
DataTypeFactory
&
data_type_factory
)
const
;
Block
&
sample
,
size_t
max_block_size
)
const
;
BlockOutputStreamPtr
getOutput
(
const
String
&
name
,
WriteBuffer
&
buf
,
Block
&
sample
)
const
;
};
...
...
dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h
浏览文件 @
0984363b
...
...
@@ -66,9 +66,9 @@ public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergeSortingBlockInputStream
(
BlockInputStreamPtr
input_
,
SortDescription
&
description_
,
size_t
max_merged_block_size_
,
size_t
limit_
,
size_t
max_bytes_before_external_sort_
,
const
std
::
string
&
tmp_path_
,
const
DataTypeFactory
&
data_type_factory_
)
size_t
max_bytes_before_external_sort_
,
const
std
::
string
&
tmp_path_
)
:
description
(
description_
),
max_merged_block_size
(
max_merged_block_size_
),
limit
(
limit_
),
max_bytes_before_external_sort
(
max_bytes_before_external_sort_
),
tmp_path
(
tmp_path_
)
,
data_type_factory
(
data_type_factory_
)
max_bytes_before_external_sort
(
max_bytes_before_external_sort_
),
tmp_path
(
tmp_path_
)
{
children
.
push_back
(
input_
);
}
...
...
@@ -97,7 +97,6 @@ private:
size_t
max_bytes_before_external_sort
;
const
std
::
string
tmp_path
;
const
DataTypeFactory
&
data_type_factory
;
Logger
*
log
=
&
Logger
::
get
(
"MergeSortingBlockInputStream"
);
...
...
@@ -115,8 +114,8 @@ private:
CompressedReadBuffer
compressed_in
;
BlockInputStreamPtr
block_in
;
TemporaryFileStream
(
const
std
::
string
&
path
,
const
DataTypeFactory
&
data_type_factory
)
:
file_in
(
path
),
compressed_in
(
file_in
),
block_in
(
new
NativeBlockInputStream
(
compressed_in
,
data_type_factory
))
{}
TemporaryFileStream
(
const
std
::
string
&
path
)
:
file_in
(
path
),
compressed_in
(
file_in
),
block_in
(
new
NativeBlockInputStream
(
compressed_in
))
{}
};
std
::
vector
<
std
::
unique_ptr
<
TemporaryFileStream
>>
temporary_inputs
;
...
...
dbms/include/DB/DataStreams/NativeBlockInputStream.h
浏览文件 @
0984363b
#pragma once
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
...
...
@@ -16,8 +15,8 @@ public:
/** В случае указания ненулевой server_revision, может ожидаться и считываться дополнительная информация о блоке,
* в зависимости от поддерживаемой для указанной ревизии.
*/
NativeBlockInputStream
(
ReadBuffer
&
istr_
,
const
DataTypeFactory
&
data_type_factory_
,
UInt64
server_revision_
=
0
)
:
istr
(
istr_
),
data_type_factory
(
data_type_factory_
),
server_revision
(
server_revision_
)
{}
NativeBlockInputStream
(
ReadBuffer
&
istr_
,
UInt64
server_revision_
=
0
)
:
istr
(
istr_
),
server_revision
(
server_revision_
)
{}
String
getName
()
const
override
{
return
"NativeBlockInputStream"
;
}
...
...
@@ -35,7 +34,6 @@ protected:
private:
ReadBuffer
&
istr
;
const
DataTypeFactory
&
data_type_factory
;
UInt64
server_revision
;
};
...
...
dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h
浏览文件 @
0984363b
...
...
@@ -35,7 +35,7 @@ public:
sample_block
{
sample_block
},
context
(
context
),
is_local
{
isLocalAddress
({
host
,
port
})},
pool
{
is_local
?
nullptr
:
std
::
make_unique
<
ConnectionPool
>
(
max_connections
,
host
,
port
,
db
,
user
,
password
,
context
.
getDataTypeFactory
(),
max_connections
,
host
,
port
,
db
,
user
,
password
,
"ClickHouseDictionarySource"
)
},
load_all_query
{
composeLoadAllQuery
()}
...
...
@@ -50,7 +50,7 @@ public:
sample_block
{
other
.
sample_block
},
context
(
other
.
context
),
is_local
{
other
.
is_local
},
pool
{
is_local
?
nullptr
:
std
::
make_unique
<
ConnectionPool
>
(
max_connections
,
host
,
port
,
db
,
user
,
password
,
context
.
getDataTypeFactory
(),
max_connections
,
host
,
port
,
db
,
user
,
password
,
"ClickHouseDictionarySource"
)},
load_all_query
{
other
.
load_all_query
}
{}
...
...
dbms/include/DB/Dictionaries/FileDictionarySource.h
浏览文件 @
0984363b
...
...
@@ -34,7 +34,7 @@ public:
{
auto
in_ptr
=
std
::
make_unique
<
ReadBufferFromFile
>
(
filename
);
auto
stream
=
context
.
getFormatFactory
().
getInput
(
format
,
*
in_ptr
,
sample_block
,
max_block_size
,
context
.
getDataTypeFactory
()
);
format
,
*
in_ptr
,
sample_block
,
max_block_size
);
last_modification
=
getLastModification
();
return
new
OwningBufferBlockInputStream
{
stream
,
std
::
move
(
in_ptr
)};
...
...
dbms/include/DB/Interpreters/Cluster.h
浏览文件 @
0984363b
...
...
@@ -2,7 +2,6 @@
#include <map>
#include <DB/Interpreters/Settings.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ConnectionPoolWithFailover.h>
#include <Poco/Net/SocketAddress.h>
...
...
@@ -16,10 +15,10 @@ namespace DB
class
Cluster
:
private
boost
::
noncopyable
{
public:
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
cluster_name
);
Cluster
(
const
Settings
&
settings
,
const
String
&
cluster_name
);
/// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные.
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
std
::
vector
<
std
::
vector
<
String
>>
names
,
Cluster
(
const
Settings
&
settings
,
std
::
vector
<
std
::
vector
<
String
>>
names
,
const
String
&
username
,
const
String
&
password
);
/// количество узлов clickhouse сервера, расположенных локально
...
...
@@ -98,8 +97,7 @@ struct Clusters
typedef
std
::
map
<
String
,
Cluster
>
Impl
;
Impl
impl
;
Clusters
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
config_name
=
"remote_servers"
);
Clusters
(
const
Settings
&
settings
,
const
String
&
config_name
=
"remote_servers"
);
};
}
dbms/include/DB/Interpreters/Context.h
浏览文件 @
0984363b
...
...
@@ -160,7 +160,6 @@ public:
const
TableFunctionFactory
&
getTableFunctionFactory
()
const
;
const
AggregateFunctionFactory
&
getAggregateFunctionFactory
()
const
;
const
DataTypeFactory
&
getDataTypeFactory
()
const
;
const
FormatFactory
&
getFormatFactory
()
const
;
const
Dictionaries
&
getDictionaries
()
const
;
const
ExternalDictionaries
&
getExternalDictionaries
()
const
;
...
...
dbms/include/DB/Interpreters/InterpreterAlterQuery.h
浏览文件 @
0984363b
...
...
@@ -77,7 +77,7 @@ private:
Context
context
;
static
void
parseAlter
(
const
ASTAlterQuery
::
ParameterContainer
&
params
,
const
DataTypeFactory
&
data_type_factory
,
static
void
parseAlter
(
const
ASTAlterQuery
::
ParameterContainer
&
params
,
AlterCommands
&
out_alter_commands
,
PartitionCommands
&
out_partition_commands
);
};
...
...
dbms/include/DB/Storages/ColumnsDescription.h
浏览文件 @
0984363b
...
...
@@ -21,7 +21,7 @@ struct ColumnsDescription
String
toString
()
const
;
static
ColumnsDescription
parse
(
const
String
&
str
,
const
DataTypeFactory
&
data_type_factory
);
static
ColumnsDescription
parse
(
const
String
&
str
);
};
...
...
dbms/include/DB/Storages/Distributed/DirectoryMonitor.h
浏览文件 @
0984363b
...
...
@@ -126,7 +126,7 @@ private:
const
std
::
string
&
user
,
const
std
::
string
&
password
)
{
return
new
ConnectionPool
{
1
,
host
,
port
,
""
,
user
,
password
,
storage
.
context
.
getDataTypeFactory
(),
user
,
password
,
storage
.
getName
()
+
'_'
+
name
};
};
...
...
dbms/include/DB/Storages/MergeTree/MergeTreeData.h
浏览文件 @
0984363b
...
...
@@ -477,7 +477,7 @@ public:
}
ReadBufferFromFile
file
(
path
,
std
::
min
(
static_cast
<
size_t
>
(
DBMS_DEFAULT_BUFFER_SIZE
),
Poco
::
File
(
path
).
getSize
()));
columns
.
readText
(
file
,
storage
.
context
.
getDataTypeFactory
()
);
columns
.
readText
(
file
);
}
void
checkNotBroken
(
bool
require_part_metadata
)
...
...
dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h
浏览文件 @
0984363b
...
...
@@ -28,8 +28,7 @@ public:
* - Проверяет правильность засечек.
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
*/
static
void
checkDataPart
(
String
path
,
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
MergeTreeData
::
DataPart
::
Checksums
*
out_checksums
=
nullptr
);
static
void
checkDataPart
(
String
path
,
const
Settings
&
settings
,
MergeTreeData
::
DataPart
::
Checksums
*
out_checksums
=
nullptr
);
};
}
dbms/include/DB/Storages/StorageSet.h
浏览文件 @
0984363b
...
...
@@ -45,7 +45,7 @@ protected:
void
restore
();
private:
void
restoreFromFile
(
const
String
&
file_path
,
const
DataTypeFactory
&
data_type_factory
);
void
restoreFromFile
(
const
String
&
file_path
);
/// Вставить блок в состояние.
virtual
void
insertBlock
(
const
Block
&
block
)
=
0
;
...
...
dbms/include/DB/TableFunctions/TableFunctionRemote.h
浏览文件 @
0984363b
...
...
@@ -3,6 +3,7 @@
#include <DB/TableFunctions/ITableFunction.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/Interpreters/reinterpretAsIdentifier.h>
#include <DB/Interpreters/Cluster.h>
...
...
@@ -117,7 +118,7 @@ public:
if
(
names
.
empty
())
throw
Exception
(
"Shard list is empty after parsing first argument"
,
ErrorCodes
::
BAD_ARGUMENTS
);
SharedPtr
<
Cluster
>
cluster
=
new
Cluster
(
context
.
getSettings
(),
context
.
getDataTypeFactory
(),
names
,
username
,
password
);
SharedPtr
<
Cluster
>
cluster
=
new
Cluster
(
context
.
getSettings
(),
names
,
username
,
password
);
return
StorageDistributed
::
create
(
getName
(),
chooseColumns
(
*
cluster
,
remote_database
,
remote_table
,
context
),
remote_database
,
remote_table
,
cluster
,
context
);
...
...
@@ -140,6 +141,8 @@ private:
};
input
->
readPrefix
();
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
while
(
true
)
{
Block
current
=
input
->
read
();
...
...
@@ -153,7 +156,7 @@ private:
String
column_name
=
(
*
name
)[
i
].
get
<
const
String
&>
();
String
data_type_name
=
(
*
type
)[
i
].
get
<
const
String
&>
();
res
.
emplace_back
(
column_name
,
context
.
getDataTypeFactory
()
.
get
(
data_type_name
));
res
.
emplace_back
(
column_name
,
data_type_factory
.
get
(
data_type_name
));
}
}
...
...
dbms/src/Client/Benchmark.cpp
浏览文件 @
0984363b
...
...
@@ -52,7 +52,7 @@ public:
const
String
&
host_
,
UInt16
port_
,
const
String
&
default_database_
,
const
String
&
user_
,
const
String
&
password_
,
const
Settings
&
settings_
)
:
concurrency
(
concurrency_
),
delay
(
delay_
),
queue
(
concurrency
),
connections
(
concurrency
,
host_
,
port_
,
default_database_
,
user_
,
password_
,
data_type_factory
),
connections
(
concurrency
,
host_
,
port_
,
default_database_
,
user_
,
password_
),
settings
(
settings_
),
pool
(
concurrency
)
{
std
::
cerr
<<
std
::
fixed
<<
std
::
setprecision
(
3
);
...
...
@@ -73,7 +73,6 @@ private:
typedef
ConcurrentBoundedQueue
<
Query
>
Queue
;
Queue
queue
;
DataTypeFactory
data_type_factory
;
ConnectionPool
connections
;
Settings
settings
;
...
...
dbms/src/Client/Client.cpp
浏览文件 @
0984363b
...
...
@@ -336,7 +336,7 @@ private:
<<
(
!
user
.
empty
()
?
" as user "
+
user
:
""
)
<<
"."
<<
std
::
endl
;
connection
=
new
Connection
(
host
,
port
,
default_database
,
user
,
password
,
context
.
getDataTypeFactory
(),
"client"
,
compression
,
connection
=
new
Connection
(
host
,
port
,
default_database
,
user
,
password
,
"client"
,
compression
,
Poco
::
Timespan
(
config
().
getInt
(
"connect_timeout"
,
DBMS_DEFAULT_CONNECT_TIMEOUT_SEC
),
0
),
Poco
::
Timespan
(
config
().
getInt
(
"receive_timeout"
,
DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC
),
0
),
Poco
::
Timespan
(
config
().
getInt
(
"send_timeout"
,
DBMS_DEFAULT_SEND_TIMEOUT_SEC
),
0
));
...
...
@@ -698,7 +698,7 @@ private:
current_format
=
insert
->
format
;
BlockInputStreamPtr
block_input
=
context
.
getFormatFactory
().
getInput
(
current_format
,
buf
,
sample
,
insert_format_max_block_size
,
context
.
getDataTypeFactory
()
);
current_format
,
buf
,
sample
,
insert_format_max_block_size
);
BlockInputStreamPtr
async_block_input
=
new
AsynchronousBlockInputStream
(
block_input
);
...
...
dbms/src/Client/Connection.cpp
浏览文件 @
0984363b
...
...
@@ -166,30 +166,30 @@ void Connection::forceConnected()
struct
PingTimeoutSetter
{
PingTimeoutSetter
(
Poco
::
Net
::
StreamSocket
&
socket_
,
const
Poco
::
Timespan
&
ping_timeout_
)
PingTimeoutSetter
(
Poco
::
Net
::
StreamSocket
&
socket_
,
const
Poco
::
Timespan
&
ping_timeout_
)
:
socket
(
socket_
),
ping_timeout
(
ping_timeout_
)
{
old_send_timeout
=
socket
.
getSendTimeout
();
old_receive_timeout
=
socket
.
getReceiveTimeout
();
if
(
old_send_timeout
>
ping_timeout
)
socket
.
setSendTimeout
(
ping_timeout
);
if
(
old_receive_timeout
>
ping_timeout
)
socket
.
setReceiveTimeout
(
ping_timeout
);
}
~
PingTimeoutSetter
()
{
socket
.
setSendTimeout
(
old_send_timeout
);
socket
.
setReceiveTimeout
(
old_receive_timeout
);
}
Poco
::
Net
::
StreamSocket
&
socket
;
Poco
::
Timespan
ping_timeout
;
Poco
::
Timespan
old_send_timeout
;
Poco
::
Timespan
old_receive_timeout
;
};
bool
Connection
::
ping
()
{
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
...
...
@@ -237,7 +237,7 @@ bool Connection::ping()
void
Connection
::
sendQuery
(
const
String
&
query
,
const
String
&
query_id_
,
UInt64
stage
,
const
Settings
*
settings
,
bool
with_pending_data
)
{
network_compression_method
=
settings
?
settings
->
network_compression_method
.
value
:
CompressionMethod
::
LZ4
;
forceConnected
();
query_id
=
query_id_
;
...
...
@@ -494,7 +494,7 @@ void Connection::initBlockInput()
else
maybe_compressed_in
=
in
;
block_in
=
new
NativeBlockInputStream
(
*
maybe_compressed_in
,
data_type_factory
,
server_revision
);
block_in
=
new
NativeBlockInputStream
(
*
maybe_compressed_in
,
server_revision
);
}
}
...
...
dbms/src/Core/NamesAndTypes.cpp
浏览文件 @
0984363b
#include <DB/Core/NamesAndTypes.h>
#include <DB/DataTypes/DataTypeFactory.h>
namespace
DB
{
void
NamesAndTypesList
::
readText
(
ReadBuffer
&
buf
,
const
DataTypeFactory
&
data_type_factory
)
void
NamesAndTypesList
::
readText
(
ReadBuffer
&
buf
)
{
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
DB
::
assertString
(
"columns format version: 1
\n
"
,
buf
);
size_t
count
;
DB
::
readText
(
count
,
buf
);
...
...
@@ -45,11 +49,11 @@ String NamesAndTypesList::toString() const
return
s
;
}
NamesAndTypesList
NamesAndTypesList
::
parse
(
const
String
&
s
,
const
DataTypeFactory
&
data_type_factory
)
NamesAndTypesList
NamesAndTypesList
::
parse
(
const
String
&
s
)
{
ReadBufferFromString
in
(
s
);
NamesAndTypesList
res
;
res
.
readText
(
in
,
data_type_factory
);
res
.
readText
(
in
);
assertEOF
(
in
);
return
res
;
}
...
...
dbms/src/DataStreams/FormatFactory.cpp
浏览文件 @
0984363b
...
...
@@ -25,10 +25,10 @@ namespace DB
{
BlockInputStreamPtr
FormatFactory
::
getInput
(
const
String
&
name
,
ReadBuffer
&
buf
,
Block
&
sample
,
size_t
max_block_size
,
const
DataTypeFactory
&
data_type_factory
)
const
Block
&
sample
,
size_t
max_block_size
)
const
{
if
(
name
==
"Native"
)
return
new
NativeBlockInputStream
(
buf
,
data_type_factory
);
return
new
NativeBlockInputStream
(
buf
);
else
if
(
name
==
"TabSeparated"
)
return
new
BlockInputStreamFromRowInputStream
(
new
TabSeparatedRowInputStream
(
buf
,
sample
),
sample
,
max_block_size
);
else
if
(
name
==
"RowBinary"
)
...
...
dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
浏览文件 @
0984363b
...
...
@@ -65,7 +65,7 @@ Block MergeSortingBlockInputStream::readImpl()
/// Сформируем сортированные потоки для слияния.
for
(
const
auto
&
file
:
temporary_files
)
{
temporary_inputs
.
emplace_back
(
new
TemporaryFileStream
(
file
->
path
()
,
data_type_factory
));
temporary_inputs
.
emplace_back
(
new
TemporaryFileStream
(
file
->
path
()));
inputs_to_merge
.
emplace_back
(
temporary_inputs
.
back
()
->
block_in
);
}
...
...
dbms/src/DataStreams/NativeBlockInputStream.cpp
浏览文件 @
0984363b
...
...
@@ -5,6 +5,7 @@
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
...
...
@@ -44,6 +45,8 @@ Block NativeBlockInputStream::readImpl()
{
Block
res
;
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
if
(
istr
.
eof
())
return
res
;
...
...
dbms/src/DataStreams/tests/native_streams.cpp
浏览文件 @
0984363b
...
...
@@ -15,7 +15,6 @@
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
...
...
@@ -117,8 +116,6 @@ int main(int argc, char ** argv)
/// читаем данные из native файла и одновременно пишем в таблицу
if
(
argc
==
2
&&
0
==
strcmp
(
argv
[
1
],
"write"
))
{
DataTypeFactory
factory
;
ReadBufferFromFileDescriptor
in1
(
STDIN_FILENO
);
CompressedReadBuffer
in2
(
in1
);
NativeBlockInputStream
in3
(
in2
,
factory
,
Revision
::
get
());
...
...
dbms/src/DataStreams/tests/sorting_stream.cpp
浏览文件 @
0984363b
...
...
@@ -148,11 +148,10 @@ int main(int argc, char ** argv)
sort_columns
.
push_back
(
SortColumnDescription
(
3
,
1
));
QueryProcessingStage
::
Enum
stage
;
DataTypeFactory
data_type_factory
;
Poco
::
SharedPtr
<
IBlockInputStream
>
in
=
table
->
read
(
column_names
,
0
,
Context
{},
Settings
(),
stage
,
argc
==
2
?
atoi
(
argv
[
1
])
:
1048576
)[
0
];
in
=
new
PartialSortingBlockInputStream
(
in
,
sort_columns
);
in
=
new
MergeSortingBlockInputStream
(
in
,
sort_columns
,
DEFAULT_BLOCK_SIZE
,
0
,
0
,
""
,
data_type_factory
);
in
=
new
MergeSortingBlockInputStream
(
in
,
sort_columns
,
DEFAULT_BLOCK_SIZE
,
0
,
0
,
""
);
//in = new LimitBlockInputStream(in, 10);
WriteBufferFromOStream
ob
(
std
::
cout
);
...
...
dbms/src/Interpreters/Cluster.cpp
浏览文件 @
0984363b
...
...
@@ -47,7 +47,7 @@ namespace
}
Clusters
::
Clusters
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
config_name
)
Clusters
::
Clusters
(
const
Settings
&
settings
,
const
String
&
config_name
)
{
Poco
::
Util
::
AbstractConfiguration
&
config
=
Poco
::
Util
::
Application
::
instance
().
config
();
Poco
::
Util
::
AbstractConfiguration
::
Keys
config_keys
;
...
...
@@ -56,11 +56,11 @@ Clusters::Clusters(const Settings & settings, const DataTypeFactory & data_type_
for
(
Poco
::
Util
::
AbstractConfiguration
::
Keys
::
const_iterator
it
=
config_keys
.
begin
();
it
!=
config_keys
.
end
();
++
it
)
impl
.
emplace
(
std
::
piecewise_construct
,
std
::
forward_as_tuple
(
*
it
),
std
::
forward_as_tuple
(
settings
,
data_type_factory
,
config_name
+
"."
+
*
it
));
std
::
forward_as_tuple
(
settings
,
config_name
+
"."
+
*
it
));
}
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
cluster_name
)
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
String
&
cluster_name
)
{
Poco
::
Util
::
AbstractConfiguration
&
config
=
Poco
::
Util
::
Application
::
instance
().
config
();
Poco
::
Util
::
AbstractConfiguration
::
Keys
config_keys
;
...
...
@@ -179,7 +179,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
replicas
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
replica
.
host_port
.
host
().
toString
(),
replica
.
host_port
.
port
(),
""
,
replica
.
user
,
replica
.
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout_with_failover_ms
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
...
...
@@ -205,7 +205,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
pools
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
address
.
host_port
.
host
().
toString
(),
address
.
host_port
.
port
(),
""
,
address
.
user
,
address
.
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
...
...
@@ -217,7 +217,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
}
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
std
::
vector
<
std
::
vector
<
String
>>
names
,
Cluster
::
Cluster
(
const
Settings
&
settings
,
std
::
vector
<
std
::
vector
<
String
>>
names
,
const
String
&
username
,
const
String
&
password
)
{
for
(
const
auto
&
shard
:
names
)
...
...
@@ -238,7 +238,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
replicas
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
replica
.
host_port
.
host
().
toString
(),
replica
.
host_port
.
port
(),
""
,
replica
.
user
,
replica
.
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout_with_failover_ms
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
...
...
dbms/src/Interpreters/Context.cpp
浏览文件 @
0984363b
...
...
@@ -13,7 +13,6 @@
#include <DB/DataStreams/FormatFactory.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/IStorage.h>
#include <DB/Storages/MarkCache.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
...
...
@@ -72,7 +71,6 @@ struct ContextShared
Databases
databases
;
/// Список БД и таблиц в них.
TableFunctionFactory
table_function_factory
;
/// Табличные функции.
AggregateFunctionFactory
aggregate_function_factory
;
/// Агрегатные функции.
DataTypeFactory
data_type_factory
;
/// Типы данных.
FormatFactory
format_factory
;
/// Форматы.
mutable
SharedPtr
<
Dictionaries
>
dictionaries
;
/// Словари Метрики. Инициализируются лениво.
mutable
SharedPtr
<
ExternalDictionaries
>
external_dictionaries
;
...
...
@@ -155,7 +153,6 @@ Context::~Context() = default;
const
TableFunctionFactory
&
Context
::
getTableFunctionFactory
()
const
{
return
shared
->
table_function_factory
;
}
const
AggregateFunctionFactory
&
Context
::
getAggregateFunctionFactory
()
const
{
return
shared
->
aggregate_function_factory
;
}
const
DataTypeFactory
&
Context
::
getDataTypeFactory
()
const
{
return
shared
->
data_type_factory
;
}
const
FormatFactory
&
Context
::
getFormatFactory
()
const
{
return
shared
->
format_factory
;
}
InterserverIOHandler
&
Context
::
getInterserverIOHandler
()
{
return
shared
->
interserver_io_handler
;
}
Poco
::
Mutex
&
Context
::
getMutex
()
const
{
return
shared
->
mutex
;
}
...
...
@@ -800,7 +797,7 @@ void Context::initClusters()
{
Poco
::
ScopedLock
<
Poco
::
Mutex
>
lock
(
shared
->
mutex
);
if
(
!
shared
->
clusters
)
shared
->
clusters
=
new
Clusters
(
settings
,
shared
->
data_type_factory
);
shared
->
clusters
=
new
Clusters
(
settings
);
}
Cluster
&
Context
::
getCluster
(
const
std
::
string
&
cluster_name
)
...
...
dbms/src/Interpreters/InterpreterAlterQuery.cpp
浏览文件 @
0984363b
...
...
@@ -11,6 +11,7 @@
#include <DB/IO/copyData.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Parsers/parseQuery.h>
...
...
@@ -35,7 +36,7 @@ void InterpreterAlterQuery::execute()
AlterCommands
alter_commands
;
PartitionCommands
partition_commands
;
parseAlter
(
alter
.
parameters
,
context
.
getDataTypeFactory
(),
alter_commands
,
partition_commands
);
parseAlter
(
alter
.
parameters
,
alter_commands
,
partition_commands
);
for
(
const
PartitionCommand
&
command
:
partition_commands
)
{
...
...
@@ -71,9 +72,11 @@ void InterpreterAlterQuery::execute()
}
void
InterpreterAlterQuery
::
parseAlter
(
const
ASTAlterQuery
::
ParameterContainer
&
params_container
,
const
DataTypeFactory
&
data_type_factory
,
const
ASTAlterQuery
::
ParameterContainer
&
params_container
,
AlterCommands
&
out_alter_commands
,
PartitionCommands
&
out_partition_commands
)
{
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
for
(
const
auto
&
params
:
params_container
)
{
if
(
params
.
type
==
ASTAlterQuery
::
ADD_COLUMN
)
...
...
dbms/src/Interpreters/InterpreterCreateQuery.cpp
浏览文件 @
0984363b
...
...
@@ -25,9 +25,11 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeFactory.h>
namespace
DB
...
...
@@ -272,6 +274,8 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
ASTPtr
default_expr_list
{
new
ASTExpressionList
};
default_expr_list
->
children
.
reserve
(
column_list_ast
.
children
.
size
());
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
for
(
auto
&
ast
:
column_list_ast
.
children
)
{
auto
&
col_decl
=
typeid_cast
<
ASTColumnDeclaration
&>
(
*
ast
);
...
...
@@ -280,7 +284,7 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
{
const
auto
&
type_range
=
col_decl
.
type
->
range
;
columns
.
emplace_back
(
col_decl
.
name
,
context
.
getDataTypeFactory
()
.
get
({
type_range
.
first
,
type_range
.
second
}));
data_type_factory
.
get
({
type_range
.
first
,
type_range
.
second
}));
}
else
/// we're creating dummy DataTypeUInt8 in order to prevent the NullPointerException in ExpressionActions
...
...
dbms/src/Interpreters/InterpreterInsertQuery.cpp
浏览文件 @
0984363b
...
...
@@ -114,8 +114,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
BlockInputStreamPtr
in
{
context
.
getFormatFactory
().
getInput
(
format
,
istr
,
sample
,
context
.
getSettings
().
max_insert_block_size
,
context
.
getDataTypeFactory
())};
format
,
istr
,
sample
,
context
.
getSettings
().
max_insert_block_size
)};
copyData
(
*
in
,
*
out
);
}
...
...
dbms/src/Interpreters/InterpreterSelectQuery.cpp
浏览文件 @
0984363b
...
...
@@ -887,7 +887,7 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
/// Сливаем сортированные блоки.
stream
=
new
MergeSortingBlockInputStream
(
stream
,
order_descr
,
settings
.
max_block_size
,
limit
,
settings
.
limits
.
max_bytes_before_external_sort
,
context
.
getTemporaryPath
()
,
context
.
getDataTypeFactory
()
);
settings
.
limits
.
max_bytes_before_external_sort
,
context
.
getTemporaryPath
());
}
...
...
dbms/src/Interpreters/tests/expression_analyzer.cpp
浏览文件 @
0984363b
...
...
@@ -3,6 +3,7 @@
#include <DB/Parsers/formatAST.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Parsers/ExpressionListParsers.h>
#include <DB/DataTypes/DataTypeFactory.h>
int
main
(
int
argc
,
char
**
argv
)
...
...
@@ -25,7 +26,7 @@ int main(int argc, char ** argv)
{
NameAndTypePair
col
;
col
.
name
=
argv
[
i
];
col
.
type
=
context
.
getDataTypeFactory
().
get
(
argv
[
i
+
1
]);
col
.
type
=
DataTypeFactory
::
instance
().
get
(
argv
[
i
+
1
]);
columns
.
push_back
(
col
);
}
...
...
dbms/src/Server/TCPHandler.cpp
浏览文件 @
0984363b
...
...
@@ -606,7 +606,6 @@ void TCPHandler::initBlockInput()
state
.
block_in
=
new
NativeBlockInputStream
(
*
state
.
maybe_compressed_in
,
query_context
.
getDataTypeFactory
(),
client_revision
);
}
}
...
...
dbms/src/Storages/ColumnsDescription.cpp
浏览文件 @
0984363b
#include <DB/Parsers/ExpressionListParsers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/Storages/ColumnsDescription.h>
#include <DB/DataTypes/DataTypeFactory.h>
namespace
DB
...
...
@@ -50,7 +51,7 @@ String ColumnsDescription<store>::toString() const
template
<
>
ColumnsDescription
<
true
>
ColumnsDescription
<
true
>::
parse
(
const
String
&
str
,
const
DataTypeFactory
&
data_type_factory
)
ColumnsDescription
<
true
>
ColumnsDescription
<
true
>::
parse
(
const
String
&
str
)
{
ReadBufferFromString
buf
{
str
};
...
...
@@ -60,6 +61,7 @@ ColumnsDescription<true> ColumnsDescription<true>::parse(const String & str, con
assertString
(
" columns:
\n
"
,
buf
);
ParserTernaryOperatorExpression
expr_parser
;
const
DataTypeFactory
&
data_type_factory
=
DataTypeFactory
::
instance
();
ColumnsDescription
<
true
>
result
{};
for
(
size_t
i
=
0
;
i
<
count
;
++
i
)
...
...
dbms/src/Storages/MergeTree/MergeTreeData.cpp
浏览文件 @
0984363b
...
...
@@ -1037,7 +1037,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const St
MergeTreePartChecker
::
Settings
settings
;
settings
.
setIndexGranularity
(
index_granularity
);
settings
.
setRequireColumnFiles
(
true
);
MergeTreePartChecker
::
checkDataPart
(
full_path
+
relative_path
,
settings
,
context
.
getDataTypeFactory
(),
&
part
->
checksums
);
MergeTreePartChecker
::
checkDataPart
(
full_path
+
relative_path
,
settings
,
&
part
->
checksums
);
{
WriteBufferFromFile
out
(
full_path
+
relative_path
+
"/checksums.txt.tmp"
,
4096
);
...
...
dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp
浏览文件 @
0984363b
...
...
@@ -249,8 +249,7 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
}
}
void
MergeTreePartChecker
::
checkDataPart
(
String
path
,
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
MergeTreeData
::
DataPart
::
Checksums
*
out_checksums
)
void
MergeTreePartChecker
::
checkDataPart
(
String
path
,
const
Settings
&
settings
,
MergeTreeData
::
DataPart
::
Checksums
*
out_checksums
)
{
if
(
!
path
.
empty
()
&&
path
.
back
()
!=
'/'
)
path
+=
"/"
;
...
...
@@ -262,7 +261,7 @@ void MergeTreePartChecker::checkDataPart(String path, const Settings & settings,
{
ReadBufferFromFile
buf
(
path
+
"columns.txt"
);
columns
.
readText
(
buf
,
data_type_factory
);
columns
.
readText
(
buf
);
assertEOF
(
buf
);
}
...
...
@@ -275,12 +274,11 @@ void MergeTreePartChecker::checkDataPart(String path, const Settings & settings,
/// Реальные чексуммы по содержимому данных. Их несоответствие checksums_txt будет говорить о битых данных.
MergeTreeData
::
DataPart
::
Checksums
checksums_data
;
size_t
primary_idx_size
;
{
ReadBufferFromFile
file_buf
(
path
+
"primary.idx"
);
HashingReadBuffer
hashing_buf
(
file_buf
);
primary_idx_size
=
hashing_buf
.
tryIgnore
(
std
::
numeric_limits
<
size_t
>::
max
());
size_t
primary_idx_size
=
hashing_buf
.
tryIgnore
(
std
::
numeric_limits
<
size_t
>::
max
());
checksums_data
.
files
[
"primary.idx"
]
=
MergeTreeData
::
DataPart
::
Checksums
::
Checksum
(
primary_idx_size
,
hashing_buf
.
getHash
());
}
...
...
@@ -345,9 +343,9 @@ void MergeTreePartChecker::checkDataPart(String path, const Settings & settings,
if
(
rows
==
Stream
::
UNKNOWN
)
throw
Exception
(
"No columns"
,
ErrorCodes
::
EMPTY_LIST_OF_COLUMNS_PASSED
);
if
(
primary_idx_size
%
((
rows
-
1
)
/
settings
.
index_granularity
+
1
))
/*
if (primary_idx_size % ((rows - 1) / settings.index_granularity + 1))
throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks ("
+
toString
(
rows
)
+
"/"
+
toString
(
settings
.
index_granularity
)
+
" rounded up)"
,
ErrorCodes
::
CORRUPTED_DATA
);
+ toString(rows) + "/" + toString(settings.index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA);
*/
if
(
settings
.
require_checksums
||
!
checksums_txt
.
files
.
empty
())
checksums_txt
.
checkEqual
(
checksums_data
,
true
);
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
0984363b
...
...
@@ -268,8 +268,7 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
assertEOF
(
buf
);
zkutil
::
Stat
stat
;
auto
columns_desc
=
ColumnsDescription
<
true
>::
parse
(
zookeeper
->
get
(
zookeeper_path
+
"/columns"
,
&
stat
),
context
.
getDataTypeFactory
());
auto
columns_desc
=
ColumnsDescription
<
true
>::
parse
(
zookeeper
->
get
(
zookeeper_path
+
"/columns"
,
&
stat
));
auto
&
columns
=
columns_desc
.
columns
;
auto
&
materialized_columns
=
columns_desc
.
materialized
;
...
...
@@ -1459,7 +1458,7 @@ void StorageReplicatedMergeTree::alterThread()
zkutil
::
Stat
stat
;
const
String
columns_str
=
zookeeper
->
get
(
zookeeper_path
+
"/columns"
,
&
stat
,
alter_thread_event
);
auto
columns_desc
=
ColumnsDescription
<
true
>::
parse
(
columns_str
,
context
.
getDataTypeFactory
()
);
auto
columns_desc
=
ColumnsDescription
<
true
>::
parse
(
columns_str
);
auto
&
columns
=
columns_desc
.
columns
;
auto
&
materialized_columns
=
columns_desc
.
materialized
;
...
...
@@ -1809,7 +1808,7 @@ void StorageReplicatedMergeTree::partCheckThread()
zk_checksums
.
checkEqual
(
part
->
checksums
,
true
);
auto
zk_columns
=
NamesAndTypesList
::
parse
(
zookeeper
->
get
(
replica_path
+
"/parts/"
+
part_name
+
"/columns"
)
,
context
.
getDataTypeFactory
()
);
zookeeper
->
get
(
replica_path
+
"/parts/"
+
part_name
+
"/columns"
));
if
(
part
->
columns
!=
zk_columns
)
throw
Exception
(
"Columns of local part "
+
part_name
+
" are different from ZooKeeper"
);
...
...
@@ -1818,7 +1817,7 @@ void StorageReplicatedMergeTree::partCheckThread()
settings
.
setRequireChecksums
(
true
);
settings
.
setRequireColumnFiles
(
true
);
MergeTreePartChecker
::
checkDataPart
(
data
.
getFullPath
()
+
part_name
,
settings
,
context
.
getDataTypeFactory
()
);
data
.
getFullPath
()
+
part_name
,
settings
);
LOG_INFO
(
log
,
"Part "
<<
part_name
<<
" looks good."
);
}
...
...
dbms/src/Storages/StorageSet.cpp
浏览文件 @
0984363b
...
...
@@ -87,8 +87,6 @@ void StorageSetOrJoinBase::restore()
constexpr
auto
file_suffix
=
".bin"
;
constexpr
auto
file_suffix_size
=
strlen
(
file_suffix
);
DataTypeFactory
data_type_factory
;
Poco
::
DirectoryIterator
dir_end
;
for
(
Poco
::
DirectoryIterator
dir_it
(
path
);
dir_end
!=
dir_it
;
++
dir_it
)
{
...
...
@@ -104,17 +102,17 @@ void StorageSetOrJoinBase::restore()
if
(
file_num
>
increment
)
increment
=
file_num
;
restoreFromFile
(
dir_it
->
path
()
,
data_type_factory
);
restoreFromFile
(
dir_it
->
path
());
}
}
}
void
StorageSetOrJoinBase
::
restoreFromFile
(
const
String
&
file_path
,
const
DataTypeFactory
&
data_type_factory
)
void
StorageSetOrJoinBase
::
restoreFromFile
(
const
String
&
file_path
)
{
ReadBufferFromFile
backup_buf
(
file_path
);
CompressedReadBuffer
compressed_backup_buf
(
backup_buf
);
NativeBlockInputStream
backup_stream
(
compressed_backup_buf
,
data_type_factory
);
NativeBlockInputStream
backup_stream
(
compressed_backup_buf
);
backup_stream
.
readPrefix
();
while
(
Block
block
=
backup_stream
.
read
())
...
...
dbms/src/Storages/tests/part_checker.cpp
浏览文件 @
0984363b
...
...
@@ -23,7 +23,7 @@ int main(int argc, char ** argv)
settings
.
setRequireColumnFiles
(
argv
[
2
][
0
]
==
'1'
);
settings
.
setVerbose
(
true
);
DB
::
MergeTreePartChecker
::
checkDataPart
(
argv
[
1
],
settings
,
DB
::
DataTypeFactory
()
);
DB
::
MergeTreePartChecker
::
checkDataPart
(
argv
[
1
],
settings
);
}
catch
(...)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录