Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
67059d8e
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
67059d8e
编写于
5月 28, 2019
作者:
C
comunodi
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add tests only for kv storages
上级
a8ce7530
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
474 addition
and
105 deletion
+474
-105
dbms/tests/integration/pytest.ini
dbms/tests/integration/pytest.ini
+1
-1
dbms/tests/integration/test_external_dictionaries/dictionary.py
...ests/integration/test_external_dictionaries/dictionary.py
+12
-15
dbms/tests/integration/test_external_dictionaries/external_sources.py
...ntegration/test_external_dictionaries/external_sources.py
+87
-24
dbms/tests/integration/test_external_dictionaries/test.py
dbms/tests/integration/test_external_dictionaries/test.py
+53
-65
dbms/tests/integration/test_external_dictionaries/test_kv.py
dbms/tests/integration/test_external_dictionaries/test_kv.py
+321
-0
未找到文件。
dbms/tests/integration/pytest.ini
浏览文件 @
67059d8e
[pytest]
python_files
=
test.py
python_files
=
test
*
.py
norecursedirs
=
_instances
dbms/tests/integration/test_external_dictionaries/dictionary.py
浏览文件 @
67059d8e
#-*- coding: utf-8 -*-
#
-*- coding: utf-8 -*-
import
copy
...
...
@@ -9,7 +9,7 @@ class Layout(object):
'cache'
:
'<cache><size_in_cells>128</size_in_cells></cache>'
,
'complex_key_hashed'
:
'<complex_key_hashed/>'
,
'complex_key_cache'
:
'<complex_key_cache><size_in_cells>128</size_in_cells></complex_key_cache>'
,
'range_hashed'
:
'<range_hashed/>'
'range_hashed'
:
'<range_hashed/>'
,
}
def
__init__
(
self
,
name
):
...
...
@@ -18,13 +18,13 @@ class Layout(object):
self
.
is_simple
=
False
self
.
is_ranged
=
False
if
self
.
name
.
startswith
(
'complex'
):
self
.
layout_type
=
"complex"
self
.
layout_type
=
'complex'
self
.
is_complex
=
True
elif
name
.
startswith
(
"range"
):
self
.
layout_type
=
"ranged"
elif
name
.
startswith
(
'range'
):
self
.
layout_type
=
'ranged'
self
.
is_ranged
=
True
else
:
self
.
layout_type
=
"simple"
self
.
layout_type
=
'simple'
self
.
is_simple
=
True
def
get_str
(
self
):
...
...
@@ -33,8 +33,7 @@ class Layout(object):
def
get_key_block_name
(
self
):
if
self
.
is_complex
:
return
'key'
else
:
return
'id'
return
'id'
class
Row
(
object
):
...
...
@@ -90,13 +89,12 @@ class Field(object):
class
DictionaryStructure
(
object
):
def
__init__
(
self
,
layout
,
fields
,
is_kv
=
False
):
def
__init__
(
self
,
layout
,
fields
):
self
.
layout
=
layout
self
.
keys
=
[]
self
.
range_key
=
None
self
.
ordinary_fields
=
[]
self
.
range_fields
=
[]
self
.
is_kv
=
is_kv
for
field
in
fields
:
if
field
.
is_key
:
...
...
@@ -121,14 +119,12 @@ class DictionaryStructure(object):
fields_strs
=
[]
for
field
in
self
.
ordinary_fields
:
fields_strs
.
append
(
field
.
get_attribute_str
())
if
self
.
is_kv
:
break
key_strs
=
[]
if
self
.
layout
.
is_complex
:
for
key_field
in
self
.
keys
:
key_strs
.
append
(
key_field
.
get_attribute_str
())
else
:
# same for simple and ranged
else
:
# same for simple and ranged
for
key_field
in
self
.
keys
:
key_strs
.
append
(
key_field
.
get_simple_index_str
())
...
...
@@ -288,13 +284,14 @@ class DictionaryStructure(object):
class
Dictionary
(
object
):
def
__init__
(
self
,
name
,
structure
,
source
,
config_path
,
table_name
):
def
__init__
(
self
,
name
,
structure
,
source
,
config_path
,
table_name
,
fields
=
None
,
values
=
None
):
self
.
name
=
name
self
.
structure
=
copy
.
deepcopy
(
structure
)
self
.
source
=
copy
.
deepcopy
(
source
)
self
.
config_path
=
config_path
self
.
table_name
=
table_name
self
.
is_kv
=
source
.
is_kv
self
.
fields
=
fields
self
.
values
=
values
def
generate_config
(
self
):
with
open
(
self
.
config_path
,
'w'
)
as
result
:
...
...
dbms/tests/integration/test_external_dictionaries/external_sources.py
浏览文件 @
67059d8e
...
...
@@ -3,6 +3,7 @@ import warnings
import
pymysql.cursors
import
pymongo
import
redis
import
aerospike
from
tzlocal
import
get_localzone
import
datetime
import
os
...
...
@@ -12,7 +13,7 @@ import time
class
ExternalSource
(
object
):
def
__init__
(
self
,
name
,
internal_hostname
,
internal_port
,
docker_hostname
,
docker_port
,
user
,
password
,
is_kv
):
docker_hostname
,
docker_port
,
user
,
password
,
storage_type
=
None
):
self
.
name
=
name
self
.
internal_hostname
=
internal_hostname
self
.
internal_port
=
int
(
internal_port
)
...
...
@@ -20,7 +21,7 @@ class ExternalSource(object):
self
.
docker_port
=
int
(
docker_port
)
self
.
user
=
user
self
.
password
=
password
self
.
is_kv
=
is_kv
self
.
storage_type
=
storage_type
def
get_source_str
(
self
,
table_name
):
raise
NotImplementedError
(
"Method {} is not implemented for {}"
.
format
(
...
...
@@ -38,9 +39,6 @@ class ExternalSource(object):
def
compatible_with_layout
(
self
,
layout
):
return
True
def
prepare_value_for_type
(
self
,
field
,
value
):
return
value
class
SourceMySQL
(
ExternalSource
):
TYPE_MAPPING
=
{
...
...
@@ -388,10 +386,12 @@ class SourceRedis(ExternalSource):
<host>{host}</host>
<port>{port}</port>
<db_index>0</db_index>
<storage_type>{storage_type}</storage_type>
</redis>
'''
.
format
(
host
=
self
.
docker_hostname
,
port
=
self
.
docker_port
,
storage_type
=
self
.
storage_type
,
# simple or hash_map
)
def
prepare
(
self
,
structure
,
table_name
,
cluster
):
...
...
@@ -399,33 +399,96 @@ class SourceRedis(ExternalSource):
self
.
prepared
=
True
def
load_data
(
self
,
data
,
table_name
):
for
row_num
,
row
in
enumerate
(
data
):
# FIXME: yield
self
.
client
.
execute_command
(
"FLUSHDB"
)
self
.
client
.
flushdb
()
for
row
in
data
:
for
cell_name
,
cell_value
in
row
.
data
.
items
():
value_type
=
"$"
if
isinstance
(
cell_value
,
int
):
value_type
=
":"
else
:
cell_value
=
'"'
+
str
(
cell_value
).
replace
(
' '
,
'\s'
)
+
'"'
cmd
=
"SET
"
+
"$"
+
cell_name
+
" "
+
value_type
+
str
(
cell_value
)
cmd
=
"SET
${} {}{}"
.
format
(
cell_name
,
value_type
,
cell_value
)
print
(
cmd
)
self
.
client
.
execute_command
(
cmd
)
return
def
load_kv_data
(
self
,
values
):
self
.
client
.
flushdb
()
if
len
(
values
[
0
])
==
2
:
self
.
client
.
mset
({
value
[
0
]:
value
[
1
]
for
value
in
values
})
else
:
for
value
in
values
:
self
.
client
.
hset
(
value
[
0
],
value
[
1
],
value
[
2
])
def
compatible_with_layout
(
self
,
layout
):
if
not
layout
.
is_simple
:
return
False
return
True
if
layout
.
is_simple
and
self
.
storage_type
==
"simple"
or
layout
.
is_complex
and
self
.
storage_type
==
"simple"
:
return
True
return
False
class
SourceAerospike
(
ExternalSource
):
def
__init__
(
self
,
name
,
internal_hostname
,
internal_port
,
docker_hostname
,
docker_port
,
user
,
password
,
storage_type
=
None
):
ExternalSource
.
__init__
(
self
,
name
,
internal_hostname
,
internal_port
,
docker_hostname
,
docker_port
,
user
,
password
,
storage_type
)
self
.
namespace
=
"test"
self
.
set
=
"test_set"
def
prepare_value_for_type
(
self
,
field
,
value
):
if
field
.
field_type
==
"Date"
:
dt
=
dateutil
.
parser
.
parse
(
value
)
return
int
(
time
.
mktime
(
dt
.
timetuple
())
//
86400
)
if
field
.
field_type
==
"DateTime"
:
dt
=
dateutil
.
parser
.
parse
(
value
)
return
int
(
time
.
mktime
(
dt
.
timetuple
()))
if
field
.
field_type
==
"Float32"
:
return
str
(
value
)
if
field
.
field_type
==
"Float64"
:
return
str
(
value
)
return
value
def
get_source_str
(
self
,
table_name
):
print
(
"AEROSPIKE get source str"
)
return
'''
<aerospike>
<host>{host}</host>
<port>{port}</port>
</aerospike>
'''
.
format
(
host
=
self
.
docker_hostname
,
port
=
self
.
docker_port
,
storage_type
=
self
.
storage_type
,
# simple or hash_map
)
def
prepare
(
self
,
structure
,
table_name
,
cluster
):
config
=
{
'hosts'
:
[
(
self
.
internal_hostname
,
self
.
internal_port
)
]
}
self
.
client
=
aerospike
.
client
(
config
).
connect
()
self
.
prepared
=
True
print
(
"PREPARED AEROSPIKE"
)
print
(
config
)
def
compatible_with_layout
(
self
,
layout
):
print
(
"compatible AEROSPIKE"
)
return
layout
.
is_simple
def
_flush_aerospike_db
(
self
):
keys
=
[]
def
handle_record
((
key
,
metadata
,
record
)):
print
(
"Handle record {} {}"
.
format
(
key
,
record
))
keys
.
append
(
key
)
def
print_record
((
key
,
metadata
,
record
)):
print
(
"Print record {} {}"
.
format
(
key
,
record
))
scan
=
self
.
client
.
scan
(
self
.
namespace
,
self
.
set
)
scan
.
foreach
(
handle_record
)
[
self
.
client
.
remove
(
key
)
for
key
in
keys
]
def
load_kv_data
(
self
,
values
):
self
.
_flush_aerospike_db
()
print
(
"Load KV Data Aerospike"
)
if
len
(
values
[
0
])
==
2
:
for
value
in
values
:
key
=
(
self
.
namespace
,
self
.
set
,
value
[
0
])
print
(
key
)
self
.
client
.
put
(
key
,
{
"bin_value"
:
value
[
1
]},
policy
=
{
"key"
:
aerospike
.
POLICY_KEY_SEND
})
assert
self
.
client
.
exists
(
key
)
else
:
assert
(
"VALUES SIZE != 2"
)
# print(values)
def
load_data
(
self
,
data
,
table_name
):
print
(
"Load Data Aerospike"
)
# print(data)
dbms/tests/integration/test_external_dictionaries/test.py
浏览文件 @
67059d8e
...
...
@@ -3,8 +3,8 @@ import os
from
helpers.cluster
import
ClickHouseCluster
from
dictionary
import
Field
,
Row
,
Dictionary
,
DictionaryStructure
,
Layout
from
external_sources
import
SourceMySQL
,
SourceClickHouse
,
SourceFile
,
SourceExecutableCache
,
SourceExecutableHashed
,
SourceMongo
from
external_sources
import
Source
HTTP
,
SourceHTTPS
,
SourceRedis
from
external_sources
import
SourceMySQL
,
SourceClickHouse
,
SourceFile
,
SourceExecutableCache
,
SourceExecutableHashed
from
external_sources
import
Source
Mongo
,
SourceHTTP
,
SourceHTTPS
SCRIPT_DIR
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
@@ -78,16 +78,15 @@ LAYOUTS = [
]
SOURCES
=
[
SourceRedis
(
"Redis"
,
"localhost"
,
"6380"
,
"redis1"
,
"6379"
,
""
,
""
,
True
),
SourceMongo
(
"MongoDB"
,
"localhost"
,
"27018"
,
"mongo1"
,
"27017"
,
"root"
,
"clickhouse"
,
False
),
SourceMySQL
(
"MySQL"
,
"localhost"
,
"3308"
,
"mysql1"
,
"3306"
,
"root"
,
"clickhouse"
,
False
),
SourceClickHouse
(
"RemoteClickHouse"
,
"localhost"
,
"9000"
,
"clickhouse1"
,
"9000"
,
"default"
,
""
,
False
),
SourceClickHouse
(
"LocalClickHouse"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
"default"
,
""
,
False
),
SourceFile
(
"File"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
""
,
""
,
False
),
SourceExecutableHashed
(
"ExecutableHashed"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
""
,
""
,
False
),
SourceExecutableCache
(
"ExecutableCache"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
""
,
""
,
False
),
SourceHTTP
(
"SourceHTTP"
,
"localhost"
,
"9000"
,
"clickhouse1"
,
"9000"
,
""
,
""
,
False
),
SourceHTTPS
(
"SourceHTTPS"
,
"localhost"
,
"9000"
,
"clickhouse1"
,
"9000"
,
""
,
""
,
False
),
SourceMongo
(
"MongoDB"
,
"localhost"
,
"27018"
,
"mongo1"
,
"27017"
,
"root"
,
"clickhouse"
),
SourceMySQL
(
"MySQL"
,
"localhost"
,
"3308"
,
"mysql1"
,
"3306"
,
"root"
,
"clickhouse"
),
SourceClickHouse
(
"RemoteClickHouse"
,
"localhost"
,
"9000"
,
"clickhouse1"
,
"9000"
,
"default"
,
""
),
SourceClickHouse
(
"LocalClickHouse"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
"default"
,
""
),
SourceFile
(
"File"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
""
,
""
),
SourceExecutableHashed
(
"ExecutableHashed"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
""
,
""
),
SourceExecutableCache
(
"ExecutableCache"
,
"localhost"
,
"9000"
,
"node"
,
"9000"
,
""
,
""
),
SourceHTTP
(
"SourceHTTP"
,
"localhost"
,
"9000"
,
"clickhouse1"
,
"9000"
,
""
,
""
),
SourceHTTPS
(
"SourceHTTPS"
,
"localhost"
,
"9000"
,
"clickhouse1"
,
"9000"
,
""
,
""
),
]
DICTIONARIES
=
[]
...
...
@@ -95,6 +94,7 @@ DICTIONARIES = []
cluster
=
None
node
=
None
def
setup_module
(
module
):
global
DICTIONARIES
global
cluster
...
...
@@ -107,9 +107,9 @@ def setup_module(module):
for
layout
in
LAYOUTS
:
for
source
in
SOURCES
:
if
source
.
compatible_with_layout
(
layout
):
structure
=
DictionaryStructure
(
layout
,
FIELDS
[
layout
.
layout_type
]
,
source
.
is_kv
)
structure
=
DictionaryStructure
(
layout
,
FIELDS
[
layout
.
layout_type
])
dict_name
=
source
.
name
+
"_"
+
layout
.
name
dict_path
=
os
.
path
.
join
(
dict_configs_path
,
dict_name
+
'.xml'
)
# FIXME: single xml config for every column
dict_path
=
os
.
path
.
join
(
dict_configs_path
,
dict_name
+
'.xml'
)
dictionary
=
Dictionary
(
dict_name
,
structure
,
source
,
dict_path
,
"table_"
+
dict_name
)
dictionary
.
generate_config
()
DICTIONARIES
.
append
(
dictionary
)
...
...
@@ -120,9 +120,10 @@ def setup_module(module):
for
fname
in
os
.
listdir
(
dict_configs_path
):
main_configs
.
append
(
os
.
path
.
join
(
dict_configs_path
,
fname
))
cluster
=
ClickHouseCluster
(
__file__
,
base_configs_dir
=
os
.
path
.
join
(
SCRIPT_DIR
,
'configs'
))
node
=
cluster
.
add_instance
(
'node'
,
main_configs
=
main_configs
,
with_mysql
=
True
,
with_mongo
=
True
,
with_redis
=
True
)
node
=
cluster
.
add_instance
(
'node'
,
main_configs
=
main_configs
,
with_mysql
=
True
,
with_mongo
=
True
)
cluster
.
add_instance
(
'clickhouse1'
)
@
pytest
.
fixture
(
scope
=
"module"
)
def
started_cluster
():
try
:
...
...
@@ -137,39 +138,28 @@ def started_cluster():
finally
:
cluster
.
shutdown
()
def
prepare_row
(
dct
,
fields
,
values
):
prepared_values
=
[]
for
field
,
value
in
zip
(
fields
,
values
):
prepared_values
.
append
(
dct
.
source
.
prepare_value_for_type
(
field
,
value
))
return
Row
(
fields
,
prepared_values
)
def
prepare_data
(
dct
,
fields
,
values_by_row
):
data
=
[]
for
row
in
values_by_row
:
data
.
append
(
prepare_row
(
dct
,
fields
,
row
))
return
data
def
test_simple_dictionaries
(
started_cluster
):
fields
=
FIELDS
[
"simple"
]
values_by_row
=
[
[
1
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
,
0
],
[
2
,
3
,
4
,
5
,
6
,
-
7
,
-
8
,
-
9
,
-
10
,
'550e8400-e29b-41d4-a716-446655440002'
,
'1978-06-28'
,
'1986-02-28 23:42:25'
,
'hello'
,
21.543
,
3222154213.4
,
1
],
data
=
[
Row
(
fields
,
[
1
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
,
0
]),
Row
(
fields
,
[
2
,
3
,
4
,
5
,
6
,
-
7
,
-
8
,
-
9
,
-
10
,
'550e8400-e29b-41d4-a716-446655440002'
,
'1978-06-28'
,
'1986-02-28 23:42:25'
,
'hello'
,
21.543
,
3222154213.4
,
1
]),
]
simple_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"simple"
]
for
dct
in
simple_dicts
:
data
=
prepare_data
(
dct
,
fields
,
values_by_row
)
dct
.
load_data
(
data
)
node
.
query
(
"system reload dictionaries"
)
queries_with_answers
=
[]
for
dct
in
simple_dicts
:
data
=
prepare_data
(
dct
,
fields
,
values_by_row
)
for
row
in
data
:
for
field
in
fields
:
if
not
field
.
is_key
:
...
...
@@ -181,8 +171,6 @@ def test_simple_dictionaries(started_cluster):
for
query
in
dct
.
get_select_get_or_default_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
field
.
default_value_for_get
))
if
dct
.
is_kv
:
break
for
query
in
dct
.
get_hierarchical_queries
(
data
[
0
]):
queries_with_answers
.
append
((
query
,
[
1
]))
...
...
@@ -201,29 +189,30 @@ def test_simple_dictionaries(started_cluster):
answer
=
str
(
answer
).
replace
(
' '
,
''
)
assert
node
.
query
(
query
)
==
str
(
answer
)
+
'
\n
'
def
test_complex_dictionaries
(
started_cluster
):
fields
=
FIELDS
[
"complex"
]
values_by_row
=
[
[
1
,
'world'
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
],
[
2
,
'qwerty2'
,
52
,
2345
,
6544
,
9191991
,
-
2
,
-
717
,
-
81818
,
-
92929
,
'550e8400-e29b-41d4-a716-446655440007'
,
'1975-09-28'
,
'2000-02-28 23:33:24'
,
'my'
,
255.543
,
3332221.44
],
data
=
[
Row
(
fields
,
[
1
,
'world'
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
]),
Row
(
fields
,
[
2
,
'qwerty2'
,
52
,
2345
,
6544
,
9191991
,
-
2
,
-
717
,
-
81818
,
-
92929
,
'550e8400-e29b-41d4-a716-446655440007'
,
'1975-09-28'
,
'2000-02-28 23:33:24'
,
'my'
,
255.543
,
3332221.44
]),
]
complex_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"complex"
and
not
d
.
is_kv
]
complex_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"complex"
]
for
dct
in
complex_dicts
:
data
=
prepare_data
(
dct
,
fields
,
values_by_row
)
dct
.
load_data
(
data
)
node
.
query
(
"system reload dictionaries"
)
queries_with_answers
=
[]
for
dct
in
complex_dicts
:
data
=
prepare_data
(
dct
,
fields
,
values_by_row
)
for
row
in
data
:
for
field
in
fields
:
if
not
field
.
is_key
:
...
...
@@ -240,38 +229,37 @@ def test_complex_dictionaries(started_cluster):
print
query
assert
node
.
query
(
query
)
==
str
(
answer
)
+
'
\n
'
def
test_ranged_dictionaries
(
started_cluster
):
fields
=
FIELDS
[
"ranged"
]
values_by_row
=
[
[
1
,
'2019-02-10'
,
'2019-02-01'
,
'2019-02-28'
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
],
[
2
,
'2019-04-10'
,
'2019-04-01'
,
'2019-04-28'
,
11
,
3223
,
41444
,
52515
,
-
65
,
-
747
,
-
8388
,
-
9099
,
'550e8400-e29b-41d4-a716-446655440004'
,
'1973-06-29'
,
'2002-02-28 23:23:25'
,
'!!!!'
,
32.543
,
3332543.4
],
data
=
[
Row
(
fields
,
[
1
,
'2019-02-10'
,
'2019-02-01'
,
'2019-02-28'
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
]),
Row
(
fields
,
[
2
,
'2019-04-10'
,
'2019-04-01'
,
'2019-04-28'
,
11
,
3223
,
41444
,
52515
,
-
65
,
-
747
,
-
8388
,
-
9099
,
'550e8400-e29b-41d4-a716-446655440004'
,
'1973-06-29'
,
'2002-02-28 23:23:25'
,
'!!!!'
,
32.543
,
3332543.4
]),
]
ranged_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"ranged"
and
not
d
.
is_kv
]
ranged_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"ranged"
]
for
dct
in
ranged_dicts
:
data
=
prepare_data
(
dct
,
fields
,
values_by_row
)
dct
.
load_data
(
data
)
node
.
query
(
"system reload dictionaries"
)
queries_with_answers
=
[]
for
dct
in
ranged_dicts
:
data
=
prepare_data
(
dct
,
fields
,
values_by_row
)
for
row
in
data
:
for
field
in
fields
:
if
not
field
.
is_key
and
not
field
.
is_range
:
for
query
in
dct
.
get_select_get_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
row
.
get_value_by_name
(
field
.
name
)))
if
dct
.
is_kv
:
break
for
query
,
answer
in
queries_with_answers
:
print
query
...
...
dbms/tests/integration/test_external_dictionaries/test_kv.py
0 → 100644
浏览文件 @
67059d8e
import
os
import
pytest
from
dictionary
import
Field
,
Row
,
Dictionary
,
DictionaryStructure
,
Layout
from
external_sources
import
SourceRedis
,
SourceAerospike
from
helpers.cluster
import
ClickHouseCluster
SCRIPT_DIR
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
FIELDS
=
{
"simple"
:
[
Field
(
"KeyField"
,
'UInt64'
,
is_key
=
True
,
default_value_for_get
=
9999999
),
Field
(
"UInt8_"
,
'UInt8'
,
default_value_for_get
=
55
),
Field
(
"UInt16_"
,
'UInt16'
,
default_value_for_get
=
66
),
Field
(
"UInt32_"
,
'UInt32'
,
default_value_for_get
=
77
),
Field
(
"UInt64_"
,
'UInt64'
,
default_value_for_get
=
88
),
Field
(
"Int8_"
,
'Int8'
,
default_value_for_get
=-
55
),
Field
(
"Int16_"
,
'Int16'
,
default_value_for_get
=-
66
),
Field
(
"Int32_"
,
'Int32'
,
default_value_for_get
=-
77
),
Field
(
"Int64_"
,
'Int64'
,
default_value_for_get
=-
88
),
Field
(
"UUID_"
,
'UUID'
,
default_value_for_get
=
'550e8400-0000-0000-0000-000000000000'
),
Field
(
"Date_"
,
'Date'
,
default_value_for_get
=
'2018-12-30'
),
Field
(
"DateTime_"
,
'DateTime'
,
default_value_for_get
=
'2018-12-30 00:00:00'
),
Field
(
"String_"
,
'String'
,
default_value_for_get
=
'hi'
),
Field
(
"Float32_"
,
'Float32'
,
default_value_for_get
=
555.11
),
Field
(
"Float64_"
,
'Float64'
,
default_value_for_get
=
777.11
),
Field
(
"ParentKeyField"
,
"UInt64"
,
default_value_for_get
=
444
,
hierarchical
=
True
),
],
"complex"
:
[
Field
(
"KeyField1"
,
'UInt64'
,
is_key
=
True
,
default_value_for_get
=
9999999
),
Field
(
"KeyField2"
,
'String'
,
is_key
=
True
,
default_value_for_get
=
'xxxxxxxxx'
),
Field
(
"UInt8_"
,
'UInt8'
,
default_value_for_get
=
55
),
Field
(
"UInt16_"
,
'UInt16'
,
default_value_for_get
=
66
),
Field
(
"UInt32_"
,
'UInt32'
,
default_value_for_get
=
77
),
Field
(
"UInt64_"
,
'UInt64'
,
default_value_for_get
=
88
),
Field
(
"Int8_"
,
'Int8'
,
default_value_for_get
=-
55
),
Field
(
"Int16_"
,
'Int16'
,
default_value_for_get
=-
66
),
Field
(
"Int32_"
,
'Int32'
,
default_value_for_get
=-
77
),
Field
(
"Int64_"
,
'Int64'
,
default_value_for_get
=-
88
),
Field
(
"UUID_"
,
'UUID'
,
default_value_for_get
=
'550e8400-0000-0000-0000-000000000000'
),
Field
(
"Date_"
,
'Date'
,
default_value_for_get
=
'2018-12-30'
),
Field
(
"DateTime_"
,
'DateTime'
,
default_value_for_get
=
'2018-12-30 00:00:00'
),
Field
(
"String_"
,
'String'
,
default_value_for_get
=
'hi'
),
Field
(
"Float32_"
,
'Float32'
,
default_value_for_get
=
555.11
),
Field
(
"Float64_"
,
'Float64'
,
default_value_for_get
=
777.11
),
],
"ranged"
:
[
Field
(
"KeyField1"
,
'UInt64'
,
is_key
=
True
),
Field
(
"KeyField2"
,
'Date'
,
is_range_key
=
True
),
Field
(
"StartDate"
,
'Date'
,
range_hash_type
=
'min'
),
Field
(
"EndDate"
,
'Date'
,
range_hash_type
=
'max'
),
Field
(
"UInt8_"
,
'UInt8'
,
default_value_for_get
=
55
),
Field
(
"UInt16_"
,
'UInt16'
,
default_value_for_get
=
66
),
Field
(
"UInt32_"
,
'UInt32'
,
default_value_for_get
=
77
),
Field
(
"UInt64_"
,
'UInt64'
,
default_value_for_get
=
88
),
Field
(
"Int8_"
,
'Int8'
,
default_value_for_get
=-
55
),
Field
(
"Int16_"
,
'Int16'
,
default_value_for_get
=-
66
),
Field
(
"Int32_"
,
'Int32'
,
default_value_for_get
=-
77
),
Field
(
"Int64_"
,
'Int64'
,
default_value_for_get
=-
88
),
Field
(
"UUID_"
,
'UUID'
,
default_value_for_get
=
'550e8400-0000-0000-0000-000000000000'
),
Field
(
"Date_"
,
'Date'
,
default_value_for_get
=
'2018-12-30'
),
Field
(
"DateTime_"
,
'DateTime'
,
default_value_for_get
=
'2018-12-30 00:00:00'
),
Field
(
"String_"
,
'String'
,
default_value_for_get
=
'hi'
),
Field
(
"Float32_"
,
'Float32'
,
default_value_for_get
=
555.11
),
Field
(
"Float64_"
,
'Float64'
,
default_value_for_get
=
777.11
),
],
}
VALUES
=
{
"simple"
:
[
[
1
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
,
0
,
],
[
2
,
3
,
4
,
5
,
6
,
-
7
,
-
8
,
-
9
,
-
10
,
'550e8400-e29b-41d4-a716-446655440002'
,
'1978-06-28'
,
'1986-02-28 23:42:25'
,
'hello'
,
21.543
,
3222154213.4
,
1
,
],
],
"complex"
:
[
[
1
,
'world'
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
,
],
[
2
,
'qwerty2'
,
52
,
2345
,
6544
,
9191991
,
-
2
,
-
717
,
-
81818
,
-
92929
,
'550e8400-e29b-41d4-a716-446655440007'
,
'1975-09-28'
,
'2000-02-28 23:33:24'
,
'my'
,
255.543
,
3332221.44
,
],
],
"ranged"
:
[
[
1
,
'2019-02-10'
,
'2019-02-01'
,
'2019-02-28'
,
22
,
333
,
4444
,
55555
,
-
6
,
-
77
,
-
888
,
-
999
,
'550e8400-e29b-41d4-a716-446655440003'
,
'1973-06-28'
,
'1985-02-28 23:43:25'
,
'hello'
,
22.543
,
3332154213.4
,
],
[
2
,
'2019-04-10'
,
'2019-04-01'
,
'2019-04-28'
,
11
,
3223
,
41444
,
52515
,
-
65
,
-
747
,
-
8388
,
-
9099
,
'550e8400-e29b-41d4-a716-446655440004'
,
'1973-06-29'
,
'2002-02-28 23:23:25'
,
'!!!!'
,
32.543
,
3332543.4
,
],
],
}
LAYOUTS
=
[
Layout
(
"flat"
),
Layout
(
"hashed"
),
Layout
(
"cache"
),
Layout
(
"complex_key_hashed"
),
Layout
(
"complex_key_cache"
),
Layout
(
"range_hashed"
),
]
SOURCES
=
[
SourceRedis
(
"RedisSimple"
,
"localhost"
,
"6380"
,
"redis1"
,
"6379"
,
""
,
""
,
storage_type
=
"simple"
),
# SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"),
# SourceAerospike("Aerospike", "localhost", "3000", "aerospike1", "3000", "", ""),
]
DICTIONARIES
=
[]
cluster
=
None
node
=
None
def
setup_kv_dict
(
suffix
,
layout
,
fields
,
kv_source
,
dict_configs_path
,
values
):
global
DICTIONARIES
structure
=
DictionaryStructure
(
layout
,
fields
)
dict_name
=
"{}_{}_{}"
.
format
(
kv_source
.
name
,
layout
.
name
,
suffix
)
dict_path
=
os
.
path
.
join
(
dict_configs_path
,
dict_name
+
'.xml'
)
dictionary
=
Dictionary
(
dict_name
,
structure
,
kv_source
,
dict_path
,
"table_"
+
dict_name
,
fields
,
values
)
dictionary
.
generate_config
()
DICTIONARIES
.
append
(
dictionary
)
def
setup_module
(
module
):
global
DICTIONARIES
global
cluster
global
node
dict_configs_path
=
os
.
path
.
join
(
SCRIPT_DIR
,
'configs/dictionaries'
)
for
f
in
os
.
listdir
(
dict_configs_path
):
os
.
remove
(
os
.
path
.
join
(
dict_configs_path
,
f
))
for
layout
in
LAYOUTS
:
for
source
in
SOURCES
:
if
source
.
compatible_with_layout
(
layout
):
if
layout
.
layout_type
==
"simple"
:
fields_len
=
len
(
FIELDS
[
"simple"
])
for
i
in
range
(
fields_len
-
1
):
local_fields
=
[
FIELDS
[
"simple"
][
0
],
FIELDS
[
"simple"
][
i
+
1
]]
local_values
=
[[
value
[
0
],
value
[
i
+
1
]]
for
value
in
VALUES
[
"simple"
]]
setup_kv_dict
(
i
+
1
,
layout
,
local_fields
,
source
,
dict_configs_path
,
local_values
)
elif
layout
.
layout_type
==
"complex"
:
fields_len
=
len
(
FIELDS
[
"complex"
])
for
i
in
range
(
fields_len
-
2
):
local_fields
=
[
FIELDS
[
'complex'
][
1
],
FIELDS
[
'complex'
][
i
+
2
]]
local_values
=
[[
value
[
1
],
value
[
i
+
2
]]
for
value
in
VALUES
[
"complex"
]]
setup_kv_dict
(
i
+
2
,
layout
,
local_fields
,
source
,
dict_configs_path
,
local_values
)
elif
layout
.
layout_type
==
"ranged"
:
fields_len
=
len
(
FIELDS
[
"ranged"
])
local_fields
=
FIELDS
[
"ranged"
][
0
:
5
]
local_values
=
VALUES
[
"ranged"
][
0
:
5
]
for
i
in
range
(
fields_len
-
4
):
local_fields
[
4
]
=
FIELDS
[
"ranged"
][
i
+
4
]
for
j
,
value
in
enumerate
(
VALUES
[
"ranged"
]):
local_values
[
j
][
4
]
=
value
[
i
+
4
]
setup_kv_dict
(
i
+
2
,
layout
,
local_fields
,
source
,
dict_configs_path
,
local_values
)
else
:
print
"Source"
,
source
.
name
,
"incompatible with layout"
,
layout
.
name
main_configs
=
[]
for
fname
in
os
.
listdir
(
dict_configs_path
):
main_configs
.
append
(
os
.
path
.
join
(
dict_configs_path
,
fname
))
cluster
=
ClickHouseCluster
(
__file__
,
base_configs_dir
=
os
.
path
.
join
(
SCRIPT_DIR
,
'configs'
))
# TODO: add your kv source flag below
node
=
cluster
.
add_instance
(
'node'
,
main_configs
=
main_configs
,
with_redis
=
True
)
cluster
.
add_instance
(
'clickhouse1'
)
@
pytest
.
fixture
(
scope
=
"module"
)
def
started_cluster
():
try
:
cluster
.
start
()
for
dictionary
in
DICTIONARIES
:
print
"Preparing"
,
dictionary
.
name
dictionary
.
prepare_source
(
cluster
)
print
"Prepared"
yield
cluster
finally
:
cluster
.
shutdown
()
def
prepare_data
(
fields
,
values_by_row
):
return
[
Row
(
fields
,
values
)
for
values
in
values_by_row
]
def
test_simple_kv_dictionaries
(
started_cluster
):
simple_kv_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"simple"
]
for
dct
in
simple_kv_dicts
:
queries_with_answers
=
[]
fields
=
dct
.
fields
print
(
"FIELDS AND VALUES FOR "
+
dct
.
name
)
print
(
fields
)
print
(
dct
.
values
)
data
=
prepare_data
(
fields
,
dct
.
values
)
dct
.
source
.
load_kv_data
(
dct
.
values
)
try
:
node
.
query
(
"system reload dictionary '{}'"
.
format
(
dct
.
name
))
except
Exception
:
print
(
dct
.
name
)
raise
for
row
in
data
:
for
field
in
fields
:
if
not
field
.
is_key
:
for
query
in
dct
.
get_select_get_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
row
.
get_value_by_name
(
field
.
name
)))
for
query
in
dct
.
get_select_has_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
1
))
for
query
in
dct
.
get_select_get_or_default_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
field
.
default_value_for_get
))
if
dct
.
fields
[
1
].
hierarchical
:
for
query
in
dct
.
get_hierarchical_queries
(
data
[
0
]):
queries_with_answers
.
append
((
query
,
[
1
]))
for
query
in
dct
.
get_hierarchical_queries
(
data
[
1
]):
queries_with_answers
.
append
((
query
,
[
2
,
1
]))
for
query
in
dct
.
get_is_in_queries
(
data
[
0
],
data
[
1
]):
queries_with_answers
.
append
((
query
,
0
))
for
query
in
dct
.
get_is_in_queries
(
data
[
1
],
data
[
0
]):
queries_with_answers
.
append
((
query
,
1
))
for
query
,
answer
in
queries_with_answers
:
if
isinstance
(
answer
,
list
):
answer
=
str
(
answer
).
replace
(
' '
,
''
)
print
query
assert
node
.
query
(
query
)
==
str
(
answer
)
+
'
\n
'
,
query
def
test_complex_dictionaries
(
started_cluster
):
complex_kv_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"complex"
]
for
dct
in
complex_kv_dicts
:
queries_with_answers
=
[]
fields
=
dct
.
fields
print
(
"FIELDS AND VALUES FOR "
+
dct
.
name
)
print
(
fields
)
print
(
dct
.
values
)
data
=
prepare_data
(
fields
,
dct
.
values
)
dct
.
source
.
load_kv_data
(
dct
.
values
)
try
:
node
.
query
(
"system reload dictionary '{}'"
.
format
(
dct
.
name
))
except
Exception
:
print
(
dct
.
name
)
raise
for
row
in
data
:
for
field
in
fields
:
if
not
field
.
is_key
:
for
query
in
dct
.
get_select_get_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
row
.
get_value_by_name
(
field
.
name
)))
for
query
in
dct
.
get_select_has_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
1
))
for
query
in
dct
.
get_select_get_or_default_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
field
.
default_value_for_get
))
for
query
,
answer
in
queries_with_answers
:
print
query
assert
node
.
query
(
query
)
==
str
(
answer
)
+
'
\n
'
def
xtest_ranged_dictionaries
(
started_cluster
):
complex_kv_dicts
=
[
d
for
d
in
DICTIONARIES
if
d
.
structure
.
layout
.
layout_type
==
"ranged"
]
for
dct
in
complex_kv_dicts
:
queries_with_answers
=
[]
fields
=
dct
.
fields
print
(
"FIELDS AND VALUES FOR "
+
dct
.
name
)
print
(
fields
)
print
(
dct
.
values
)
data
=
prepare_data
(
fields
,
dct
.
values
)
dct
.
source
.
load_kv_data
(
dct
.
values
)
try
:
node
.
query
(
"system reload dictionary '{}'"
.
format
(
dct
.
name
))
except
Exception
:
print
(
dct
.
name
)
raise
for
row
in
data
:
for
field
in
fields
:
if
not
field
.
is_key
and
not
field
.
is_range
:
for
query
in
dct
.
get_select_get_queries
(
field
,
row
):
queries_with_answers
.
append
((
query
,
row
.
get_value_by_name
(
field
.
name
)))
for
query
,
answer
in
queries_with_answers
:
print
query
assert
node
.
query
(
query
)
==
str
(
answer
)
+
'
\n
'
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录