Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
07d5a1ec
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
07d5a1ec
编写于
4月 02, 2021
作者:
M
Maksim Kita
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ClickHouseDictionarySource loop fix
上级
c3a9cbe0
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
173 addition
and
96 deletion
+173
-96
src/Dictionaries/ClickHouseDictionarySource.cpp
src/Dictionaries/ClickHouseDictionarySource.cpp
+87
-79
src/Dictionaries/ClickHouseDictionarySource.h
src/Dictionaries/ClickHouseDictionarySource.h
+19
-17
tests/queries/0_stateless/01780_clickhouse_dictionary_source_loop.reference
...ateless/01780_clickhouse_dictionary_source_loop.reference
+3
-0
tests/queries/0_stateless/01780_clickhouse_dictionary_source_loop.sql
...s/0_stateless/01780_clickhouse_dictionary_source_loop.sql
+64
-0
未找到文件。
src/Dictionaries/ClickHouseDictionarySource.cpp
浏览文件 @
07d5a1ec
...
...
@@ -18,97 +18,72 @@
namespace
DB
{
static
const
size_t
MAX_CONNECTIONS
=
16
;
inline
static
UInt16
getPortFromContext
(
const
Context
&
context
,
bool
secure
)
namespace
ErrorCodes
{
return
secure
?
context
.
getTCPPortSecure
().
value_or
(
0
)
:
context
.
getTCPPort
()
;
extern
const
int
BAD_ARGUMENTS
;
}
static
ConnectionPoolWithFailoverPtr
createPool
(
const
std
::
string
&
host
,
UInt16
port
,
bool
secure
,
const
std
::
string
&
db
,
const
std
::
string
&
user
,
const
std
::
string
&
password
)
namespace
{
ConnectionPoolPtrs
pools
;
pools
.
emplace_back
(
std
::
make_shared
<
ConnectionPool
>
(
MAX_CONNECTIONS
,
host
,
port
,
db
,
user
,
password
,
""
,
/* cluster */
""
,
/* cluster_secret */
"ClickHouseDictionarySource"
,
Protocol
::
Compression
::
Enable
,
secure
?
Protocol
::
Secure
::
Enable
:
Protocol
::
Secure
::
Disable
));
return
std
::
make_shared
<
ConnectionPoolWithFailover
>
(
pools
,
LoadBalancing
::
RANDOM
);
}
constexpr
size_t
MAX_CONNECTIONS
=
16
;
inline
UInt16
getPortFromContext
(
const
Context
&
context
,
bool
secure
)
{
return
secure
?
context
.
getTCPPortSecure
().
value_or
(
0
)
:
context
.
getTCPPort
();
}
ConnectionPoolWithFailoverPtr
createPool
(
const
ClickHouseDictionarySource
::
Configuration
&
configuration
)
{
if
(
configuration
.
is_local
)
return
nullptr
;
ConnectionPoolPtrs
pools
;
pools
.
emplace_back
(
std
::
make_shared
<
ConnectionPool
>
(
MAX_CONNECTIONS
,
configuration
.
host
,
configuration
.
port
,
configuration
.
db
,
configuration
.
user
,
configuration
.
password
,
""
,
/* cluster */
""
,
/* cluster_secret */
"ClickHouseDictionarySource"
,
Protocol
::
Compression
::
Enable
,
configuration
.
secure
?
Protocol
::
Secure
::
Enable
:
Protocol
::
Secure
::
Disable
));
return
std
::
make_shared
<
ConnectionPoolWithFailover
>
(
pools
,
LoadBalancing
::
RANDOM
);
}
}
ClickHouseDictionarySource
::
ClickHouseDictionarySource
(
const
DictionaryStructure
&
dict_struct_
,
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
std
::
string
&
path_to_settings
,
const
std
::
string
&
config_prefix
,
const
Configuration
&
configuration_
,
const
Block
&
sample_block_
,
const
Context
&
context_
,
const
std
::
string
&
default_database
)
const
Context
&
context_
)
:
update_time
{
std
::
chrono
::
system_clock
::
from_time_t
(
0
)}
,
dict_struct
{
dict_struct_
}
,
secure
(
config
.
getBool
(
config_prefix
+
".secure"
,
false
))
,
host
{
config
.
getString
(
config_prefix
+
".host"
,
"localhost"
)}
,
port
(
config
.
getInt
(
config_prefix
+
".port"
,
getPortFromContext
(
context_
,
secure
)))
,
user
{
config
.
getString
(
config_prefix
+
".user"
,
"default"
)}
,
password
{
config
.
getString
(
config_prefix
+
".password"
,
""
)}
,
db
{
config
.
getString
(
config_prefix
+
".db"
,
default_database
)}
,
table
{
config
.
getString
(
config_prefix
+
".table"
)}
,
where
{
config
.
getString
(
config_prefix
+
".where"
,
""
)}
,
update_field
{
config
.
getString
(
config_prefix
+
".update_field"
,
""
)}
,
invalidate_query
{
config
.
getString
(
config_prefix
+
".invalidate_query"
,
""
)}
,
query_builder
{
dict_struct
,
db
,
""
,
table
,
where
,
IdentifierQuotingStyle
::
Backticks
}
,
configuration
{
configuration_
}
,
query_builder
{
dict_struct
,
configuration
.
db
,
""
,
configuration
.
table
,
configuration
.
where
,
IdentifierQuotingStyle
::
Backticks
}
,
sample_block
{
sample_block_
}
,
context
(
context_
)
,
is_local
{
isLocalAddress
({
host
,
port
},
getPortFromContext
(
context_
,
secure
))}
,
pool
{
is_local
?
nullptr
:
createPool
(
host
,
port
,
secure
,
db
,
user
,
password
)}
,
context
{
context_
}
,
pool
{
createPool
(
configuration
)}
,
load_all_query
{
query_builder
.
composeLoadAllQuery
()}
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
if
(
is_local
)
{
context
.
setUser
(
user
,
password
,
Poco
::
Net
::
SocketAddress
(
"127.0.0.1"
,
0
));
context
=
copyContextAndApplySettings
(
path_to_settings
,
context
,
config
);
}
/// Query context is needed because some code in executeQuery function may assume it exists.
/// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
context
.
makeQueryContext
();
}
ClickHouseDictionarySource
::
ClickHouseDictionarySource
(
const
ClickHouseDictionarySource
&
other
)
:
update_time
{
other
.
update_time
}
,
dict_struct
{
other
.
dict_struct
}
,
secure
{
other
.
secure
}
,
host
{
other
.
host
}
,
port
{
other
.
port
}
,
user
{
other
.
user
}
,
password
{
other
.
password
}
,
db
{
other
.
db
}
,
table
{
other
.
table
}
,
where
{
other
.
where
}
,
update_field
{
other
.
update_field
}
,
invalidate_query
{
other
.
invalidate_query
}
,
configuration
{
other
.
configuration
}
,
invalidate_query_response
{
other
.
invalidate_query_response
}
,
query_builder
{
dict_struct
,
db
,
""
,
table
,
where
,
IdentifierQuotingStyle
::
Backticks
}
,
query_builder
{
dict_struct
,
configuration
.
db
,
""
,
configuration
.
table
,
configuration
.
where
,
IdentifierQuotingStyle
::
Backticks
}
,
sample_block
{
other
.
sample_block
}
,
context
(
other
.
context
)
,
is_local
{
other
.
is_local
}
,
pool
{
is_local
?
nullptr
:
createPool
(
host
,
port
,
secure
,
db
,
user
,
password
)}
,
context
{
other
.
context
}
,
pool
{
createPool
(
configuration
)}
,
load_all_query
{
other
.
load_all_query
}
{
context
.
makeQueryContext
();
...
...
@@ -121,7 +96,7 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
time_t
hr_time
=
std
::
chrono
::
system_clock
::
to_time_t
(
update_time
)
-
1
;
std
::
string
str_time
=
DateLUT
::
instance
().
timeToString
(
hr_time
);
update_time
=
std
::
chrono
::
system_clock
::
now
();
return
query_builder
.
composeUpdateQuery
(
update_field
,
str_time
);
return
query_builder
.
composeUpdateQuery
(
configuration
.
update_field
,
str_time
);
}
else
{
...
...
@@ -155,9 +130,9 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_col
bool
ClickHouseDictionarySource
::
isModified
()
const
{
if
(
!
invalidate_query
.
empty
())
if
(
!
configuration
.
invalidate_query
.
empty
())
{
auto
response
=
doInvalidateQuery
(
invalidate_query
);
auto
response
=
doInvalidateQuery
(
configuration
.
invalidate_query
);
LOG_TRACE
(
log
,
"Invalidate query has returned: {}, previous value: {}"
,
response
,
invalidate_query_response
);
if
(
invalidate_query_response
==
response
)
return
false
;
...
...
@@ -168,21 +143,21 @@ bool ClickHouseDictionarySource::isModified() const
bool
ClickHouseDictionarySource
::
hasUpdateField
()
const
{
return
!
update_field
.
empty
();
return
!
configuration
.
update_field
.
empty
();
}
std
::
string
ClickHouseDictionarySource
::
toString
()
const
{
return
"ClickHouse: "
+
db
+
'.'
+
table
+
(
where
.
empty
()
?
""
:
", where: "
+
where
);
const
std
::
string
&
where
=
configuration
.
where
;
return
"ClickHouse: "
+
configuration
.
db
+
'.'
+
configuration
.
table
+
(
where
.
empty
()
?
""
:
", where: "
+
where
);
}
BlockInputStreamPtr
ClickHouseDictionarySource
::
createStreamForQuery
(
const
String
&
query
)
{
/// Sample block should not contain first row default values
auto
empty_sample_block
=
sample_block
.
cloneEmpty
();
if
(
is_local
)
if
(
configuration
.
is_local
)
{
auto
stream
=
executeQuery
(
query
,
context
,
true
).
getInputStream
();
stream
=
std
::
make_shared
<
ConvertingBlockInputStream
>
(
stream
,
empty_sample_block
,
ConvertingBlockInputStream
::
MatchColumnsMode
::
Position
);
...
...
@@ -195,7 +170,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const Strin
std
::
string
ClickHouseDictionarySource
::
doInvalidateQuery
(
const
std
::
string
&
request
)
const
{
LOG_TRACE
(
log
,
"Performing invalidate query"
);
if
(
is_local
)
if
(
configuration
.
is_local
)
{
Context
query_context
=
context
;
auto
input_block
=
executeQuery
(
request
,
query_context
,
true
).
getInputStream
();
...
...
@@ -210,7 +185,6 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
}
}
void
registerDictionarySourceClickHouse
(
DictionarySourceFactory
&
factory
)
{
auto
create_table_source
=
[
=
](
const
DictionaryStructure
&
dict_struct
,
...
...
@@ -218,12 +192,46 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
const
std
::
string
&
config_prefix
,
Block
&
sample_block
,
const
Context
&
context
,
const
std
::
string
&
default_database
,
const
std
::
string
&
default_database
[[
maybe_unused
]]
,
bool
/* check_config */
)
->
DictionarySourcePtr
{
return
std
::
make_unique
<
ClickHouseDictionarySource
>
(
dict_struct
,
config
,
config_prefix
,
config_prefix
+
".clickhouse"
,
sample_block
,
context
,
default_database
);
bool
secure
=
config
.
getBool
(
config_prefix
+
".secure"
,
false
);
Context
context_copy
=
context
;
UInt16
default_port
=
getPortFromContext
(
context_copy
,
secure
);
std
::
string
settings_config_prefix
=
config_prefix
+
".clickhouse"
;
ClickHouseDictionarySource
::
Configuration
configuration
{
.
secure
=
config
.
getBool
(
settings_config_prefix
+
".secure"
,
false
),
.
host
=
config
.
getString
(
settings_config_prefix
+
".host"
,
"localhost"
),
.
port
=
static_cast
<
UInt16
>
(
config
.
getUInt
(
settings_config_prefix
+
".port"
,
default_port
)),
.
user
=
config
.
getString
(
settings_config_prefix
+
".user"
,
"default"
),
.
password
=
config
.
getString
(
settings_config_prefix
+
".password"
,
""
),
.
db
=
config
.
getString
(
settings_config_prefix
+
".db"
,
default_database
),
.
table
=
config
.
getString
(
settings_config_prefix
+
".table"
),
.
where
=
config
.
getString
(
settings_config_prefix
+
".where"
,
""
),
.
update_field
=
config
.
getString
(
settings_config_prefix
+
".update_field"
,
""
),
.
invalidate_query
=
config
.
getString
(
settings_config_prefix
+
".invalidate_query"
,
""
),
.
is_local
=
isLocalAddress
({
configuration
.
host
,
configuration
.
port
},
default_port
)
};
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
if
(
configuration
.
is_local
)
{
context_copy
.
setUser
(
configuration
.
user
,
configuration
.
password
,
Poco
::
Net
::
SocketAddress
(
"127.0.0.1"
,
0
));
context_copy
=
copyContextAndApplySettings
(
config_prefix
,
context_copy
,
config
);
}
String
dictionary_name
=
config
.
getString
(
".dictionary.name"
,
""
);
String
dictionary_database
=
config
.
getString
(
".dictionary.database"
,
""
);
if
(
dictionary_name
==
configuration
.
table
&&
dictionary_database
==
configuration
.
db
)
throw
Exception
(
ErrorCodes
::
BAD_ARGUMENTS
,
"ClickHouseDictionarySource table cannot be dictionary table"
);
return
std
::
make_unique
<
ClickHouseDictionarySource
>
(
dict_struct
,
configuration
,
sample_block
,
context_copy
);
};
factory
.
registerSource
(
"clickhouse"
,
create_table_source
);
}
...
...
src/Dictionaries/ClickHouseDictionarySource.h
浏览文件 @
07d5a1ec
...
...
@@ -18,14 +18,26 @@ namespace DB
class
ClickHouseDictionarySource
final
:
public
IDictionarySource
{
public:
struct
Configuration
{
const
bool
secure
;
const
std
::
string
host
;
const
UInt16
port
;
const
std
::
string
user
;
const
std
::
string
password
;
const
std
::
string
db
;
const
std
::
string
table
;
const
std
::
string
where
;
const
std
::
string
update_field
;
const
std
::
string
invalidate_query
;
const
bool
is_local
;
};
ClickHouseDictionarySource
(
const
DictionaryStructure
&
dict_struct_
,
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
std
::
string
&
path_to_settings
,
const
std
::
string
&
config_prefix
,
const
Configuration
&
configuration_
,
const
Block
&
sample_block_
,
const
Context
&
context
,
const
std
::
string
&
default_database
);
const
Context
&
context
);
/// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource
(
const
ClickHouseDictionarySource
&
other
);
...
...
@@ -50,7 +62,7 @@ public:
/// Used for detection whether the hashtable should be preallocated
/// (since if there is WHERE then it can filter out too much)
bool
hasWhere
()
const
{
return
!
where
.
empty
();
}
bool
hasWhere
()
const
{
return
!
configuration
.
where
.
empty
();
}
private:
std
::
string
getUpdateFieldAndDate
();
...
...
@@ -61,21 +73,11 @@ private:
std
::
chrono
::
time_point
<
std
::
chrono
::
system_clock
>
update_time
;
const
DictionaryStructure
dict_struct
;
const
bool
secure
;
const
std
::
string
host
;
const
UInt16
port
;
const
std
::
string
user
;
const
std
::
string
password
;
const
std
::
string
db
;
const
std
::
string
table
;
const
std
::
string
where
;
const
std
::
string
update_field
;
std
::
string
invalidate_query
;
const
Configuration
configuration
;
mutable
std
::
string
invalidate_query_response
;
ExternalQueryBuilder
query_builder
;
Block
sample_block
;
Context
context
;
const
bool
is_local
;
ConnectionPoolWithFailoverPtr
pool
;
const
std
::
string
load_all_query
;
Poco
::
Logger
*
log
=
&
Poco
::
Logger
::
get
(
"ClickHouseDictionarySource"
);
...
...
tests/queries/0_stateless/01780_clickhouse_dictionary_source_loop.reference
0 → 100644
浏览文件 @
07d5a1ec
1 1
2 2
3 3
tests/queries/0_stateless/01780_clickhouse_dictionary_source_loop.sql
0 → 100644
浏览文件 @
07d5a1ec
DROP
DICTIONARY
IF
EXISTS
dict1
;
CREATE
DICTIONARY
dict1
(
id
UInt64
,
value
String
)
PRIMARY
KEY
id
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
TABLE
'dict1'
))
LAYOUT
(
DIRECT
());
SELECT
*
FROM
dict1
;
--{serverError 36}
DROP
DICTIONARY
dict1
;
DROP
DICTIONARY
IF
EXISTS
dict2
;
CREATE
DICTIONARY
default
.
dict2
(
id
UInt64
,
value
String
)
PRIMARY
KEY
id
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
TABLE
'dict2'
))
LAYOUT
(
DIRECT
());
SELECT
*
FROM
dict2
;
--{serverError 36}
DROP
DICTIONARY
dict2
;
DROP
DATABASE
IF
EXISTS
01780
_db
;
CREATE
DATABASE
01780
_db
;
DROP
DICTIONARY
IF
EXISTS
dict3
;
CREATE
DICTIONARY
01780
_db
.
dict3
(
id
UInt64
,
value
String
)
PRIMARY
KEY
id
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
DATABASE
'01780_db'
TABLE
'dict3'
))
LAYOUT
(
DIRECT
());
SELECT
*
FROM
01780
_db
.
dict3
;
--{serverError 36}
DROP
DICTIONARY
01780
_db
.
dict3
;
CREATE
TABLE
01780
_db
.
dict3_source
(
id
UInt64
,
value
String
)
ENGINE
=
TinyLog
;
INSERT
INTO
01780
_db
.
dict3_source
VALUES
(
1
,
'1'
),
(
2
,
'2'
),
(
3
,
'3'
);
CREATE
DICTIONARY
01780
_db
.
dict3
(
id
UInt64
,
value
String
)
PRIMARY
KEY
id
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
TABLE
'dict3_source'
DATABASE
'01780_db'
))
LAYOUT
(
DIRECT
());
SELECT
*
FROM
01780
_db
.
dict3
;
DROP
DICTIONARY
01780
_db
.
dict3
;
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录