提交 010a7e00 编写于 作者: A alesapin

Add mysql dictionaries tests

上级 964f9868
......@@ -98,6 +98,7 @@ class ClickHouseCluster:
self.with_kafka = False
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_mongo = False
self.docker_client = None
self.is_up = False
......@@ -109,7 +110,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
......@@ -127,7 +128,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address)
......@@ -176,6 +177,11 @@ class ClickHouseCluster:
self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
if with_mongo and not self.with_mongo:
self.with_mongo = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
return instance
......@@ -290,6 +296,10 @@ class ClickHouseCluster:
subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate'])
self.wait_hdfs_to_start(120)
if self.with_mongo and self.base_mongo_cmd:
subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate'])
time.sleep(10)
subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds
......@@ -361,11 +371,8 @@ services:
cap_add:
- SYS_PTRACE
depends_on: {depends_on}
user: '{user}'
env_file:
- {env_file}
security_opt:
- label:disable
{networks}
{app_net}
{ipv4_address}
......@@ -388,8 +395,9 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test",
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir,
server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=None,
env_variables={}, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None):
self.name = name
......@@ -412,6 +420,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
self.with_mongo = with_mongo
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
......@@ -672,7 +681,6 @@ class ClickHouseInstance:
db_dir=db_dir,
logs_dir=logs_dir,
depends_on=str(depends_on),
user=os.getuid(),
env_file=env_file,
odbc_ini_path=odbc_ini_path,
entrypoint_cmd=entrypoint_cmd,
......
version: '2.2'
services:
mongo1:
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: clickhouse
ports:
- 27018:27017
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>
#-*- coding: utf-8 -*-
class Layout(object):
LAYOUTS_STR_DICT = {
'flat': '<flat/>',
'hashed': '<hashed/>',
'cache': '<cache><size_in_cells>128</size_in_cells></cache>',
'complex_key_hashed': '<complex_key_hashed/>',
'complex_key_cache': '<complex_key_cache><size_in_cells>128</size_in_cells></complex_key_cache>',
'range_hashed': '<range_hashed/>'
}
def __init__(self, name):
self.name = name
self.is_complex = False
self.is_simple = False
self.is_ranged = False
if self.name.startswith('complex'):
self.layout_type = "complex"
self.is_complex = True
elif name.startswith("range"):
self.layout_type = "ranged"
self.is_ranged = True
else:
self.layout_type = "simple"
self.is_simple = True
def get_str(self):
return self.LAYOUTS_STR_DICT[self.name]
def get_key_block_name(self):
if self.is_complex:
return 'key'
else:
return 'id'
class Row(object):
def __init__(self, fields, values):
self.data = {}
for field, value in zip(fields, values):
self.data[field.name] = value
def get_value_by_name(self, name):
return self.data[name]
class Field(object):
def __init__(self, name, field_type, is_key=False, is_range_key=False, default=None, hierarchical=False, range_hash_type=None):
self.name = name
self.field_type = field_type
self.is_key = is_key
self.default = default
self.hierarchical = hierarchical
self.range_hash_type = range_hash_type
self.is_range = self.range_hash_type is not None
self.is_range_key = is_range_key
def get_attribute_str(self):
return '''
<attribute>
<name>{name}</name>
<type>{field_type}</type>
<null_value>{default}</null_value>
<hierarchical>{hierarchical}</hierarchical>
</attribute>'''.format(
name=self.name,
field_type=self.field_type,
default=self.default if self.default else '',
hierarchical='true' if self.hierarchical else 'false',
)
def get_simple_index_str(self):
return '<name>{name}</name>'.format(name=self.name)
def get_range_hash_str(self):
if not self.range_hash_type:
raise Exception("Field {} is not range hashed".format(self.name))
return '''
<range_{type}>
<name>{name}</name>
</range_{type}>
'''.format(type=self.range_hash_type, name=self.name)
class DictionaryStructure(object):
def __init__(self, layout, fields):
self.layout = layout
self.keys = []
self.range_key = None
self.ordinary_fields = []
self.range_fields = []
for field in fields:
if field.is_key:
self.keys.append(field)
elif field.is_range:
self.range_fields.append(field)
else:
self.ordinary_fields.append(field)
if field.is_range_key:
if self.range_key is not None:
raise Exception("Duplicate range key {}".format(field.name))
self.range_key = field
if not self.layout.is_complex and len(self.keys) > 1:
raise Exception("More than one key {} field in non complex layout {}".format(len(self.keys), self.layout.name))
if self.layout.is_ranged and (not self.range_key or len(self.range_fields) != 2):
raise Exception("Inconsistent configuration of ranged dictionary")
def get_structure_str(self):
fields_strs = []
for field in self.ordinary_fields:
fields_strs.append(field.get_attribute_str())
key_strs = []
if self.layout.is_complex:
for key_field in self.keys:
key_strs.append(key_field.get_attribute_str())
else: # same for simple and ranged
for key_field in self.keys:
key_strs.append(key_field.get_simple_index_str())
ranged_strs = []
if self.layout.is_ranged:
for range_field in self.range_fields:
ranged_strs.append(range_field.get_range_hash_str())
return '''
<layout>
{layout_str}
</layout>
<structure>
<{key_block_name}>
{key_str}
</{key_block_name}>
{attributes_str}
{range_strs}
</structure>'''.format(
layout_str=self.layout.get_str(),
key_block_name=self.layout.get_key_block_name(),
key_str='\n'.join(key_strs),
attributes_str='\n'.join(fields_strs),
range_strs='\n'.join(ranged_strs),
)
def get_dict_get_expression(self, dict_name, field, row):
if field in self.keys:
raise Exception("Trying to receive key field {} from dictionary".format(field.name))
if not self.layout.is_complex:
key_expr = 'toUInt64({})'.format(row.data[self.keys[0].name])
else:
key_exprs_strs = []
for key in self.keys:
val = row.data[key.name]
if isinstance(val, str):
val = "'" + val + "'"
key_exprs_strs.append('to{type}({value})'.format(type=key.field_type, value=val))
key_expr = '(' + ','.join(key_exprs_strs) + ')'
date_expr = ''
if self.layout.is_ranged:
val = row.data[self.range_key.name]
if isinstance(val, str):
val = "'" + val + "'"
val = "to{type}({val})".format(type=self.range_key.field_type, val=val)
date_expr = ', ' + val
return "dictGet{field_type}('{dict_name}', '{field_name}', {key_expr}{date_expr})".format(
field_type=field.field_type,
dict_name=dict_name,
field_name=field.name,
key_expr=key_expr,
date_expr=date_expr,
)
class Dictionary(object):
def __init__(self, name, structure, source, config_path, table_name):
self.name = name
self.structure = structure
self.source = source
self.config_path = config_path
self.table_name = table_name
def generate_config(self):
with open(self.config_path, 'w') as result:
result.write('''
<dictionaries>
<dictionary>
<lifetime>
<min>3</min>
<max>5</max>
</lifetime>
<name>{name}</name>
{structure}
<source>
{source}
</source>
</dictionary>
</dictionaries>
'''.format(
name=self.name,
structure=self.structure.get_structure_str(),
source=self.source.get_source_str(self.table_name),
))
def prepare_source(self):
self.source.prepare(self.structure, self.table_name)
def load_data(self, data):
if not self.source.prepared:
raise Exception("Cannot load data for dictionary {}, source is not prepared".format(self.name))
self.source.load_data(data, self.table_name)
def get_select_query(self, field, row):
return 'select {}'.format(self.structure.get_dict_get_expression(self.name, field, row))
def is_complex(self):
return self.structure.layout.is_complex
# -*- coding: utf-8 -*-
import warnings
import pymysql.cursors
import pymongo
class ExternalSource(object):
def __init__(self, name, internal_hostname, internal_port,
docker_hostname, docker_port, user, password):
self.name = name
self.internal_hostname = internal_hostname
self.internal_port = int(internal_port)
self.docker_hostname = docker_hostname
self.docker_port = int(docker_port)
self.user = user
self.password = password
def get_source_str(self):
raise NotImplementedError("Method {} is not implemented for {}".format(
"get_source_config_part", self.__class__.__name__))
def prepare(self, structure):
raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__))
# data is banch of Row
def load_data(self, data):
raise NotImplementedError("Method {} is not implemented for {}".format(
"prepare_remote_source", self.__class__.__name__))
class SourceMySQL(ExternalSource):
TYPE_MAPPING = {
'UInt8': 'tinyint unsigned',
'UInt16': 'smallint unsigned',
'UInt32': 'int unsigned',
'UInt64': 'bigint unsigned',
'Int8': 'tinyint',
'Int16': 'smallint',
'Int32': 'int',
'Int64': 'bigint',
'UUID': 'varchar(36)',
'Date': 'date',
'DateTime': 'datetime',
'String': 'text',
'Float32': 'float',
'Float64': 'double'
}
def create_mysql_conn(self):
self.connection = pymysql.connect(
user=self.user,
password=self.password,
host=self.internal_hostname,
port=self.internal_port)
def execute_mysql_query(self, query):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with self.connection.cursor() as cursor:
cursor.execute(query)
self.connection.commit()
def get_source_str(self, table_name):
return '''
<mysql>
<replica>
<priority>1</priority>
<host>127.0.0.1</host>
<port>3333</port> <!-- Wrong port, for testing basic failover to work. -->
</replica>
<replica>
<priority>2</priority>
<host>{hostname}</host>
<port>{port}</port>
</replica>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<table>{tbl}</table>
</mysql>'''.format(
hostname=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name):
self.create_mysql_conn()
self.execute_mysql_query("create database if not exists test default character set 'utf8'")
fields_strs = []
for field in structure.keys + structure.ordinary_fields + structure.range_fields:
fields_strs.append(field.name + ' ' + self.TYPE_MAPPING[field.field_type])
create_query = '''create table test.{table_name} (
{fields_str});
'''.format(table_name=table_name, fields_str=','.join(fields_strs))
self.execute_mysql_query(create_query)
self.prepared = True
def load_data(self, data, table_name):
values_strs = []
if not data:
return
ordered_names = [name for name in data[0].data]
for row in data:
sorted_row = []
for name in ordered_names:
data = row.data[name]
if isinstance(row.data[name], str):
data = "'" + data + "'"
else:
data = str(data)
sorted_row.append(data)
values_strs.append('(' + ','.join(sorted_row) + ')')
query = 'insert into test.{} ({}) values {}'.format(
table_name,
','.join(ordered_names),
''.join(values_strs))
self.execute_mysql_query(query)
class SourceMongo(ExternalSource):
def get_source_str(self, table_name):
return '''
<mongodb>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<collection>{tbl}</collection>
</mongodb>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host=self.internal_hostname, port=self.internal_port,
user=self.user, password=self.password)
self.connection = pymongo.MongoClient(connection_str)
self.connection.create
self.structure = structure
self.db = self.connection['test']
self.prepared = True
def load_data(self, data, table_name):
tbl = self.db[table_name]
to_insert = [dict(row.data) for row in data]
result = tbl.insert_many(to_insert)
print "IDS:", result.inserted_ids
for r in tbl.find():
print "RESULT:", r
import pytest
import os
import time
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceMongo
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
FIELDS = {
"simple": [
Field("KeyField", 'UInt64', is_key=True),
Field("UInt8_", 'UInt8'),
Field("UInt16_", 'UInt16'),
Field("UInt32_", 'UInt32'),
Field("UInt64_", 'UInt64'),
Field("Int8_", 'Int8'),
Field("Int16_", 'Int16'),
Field("Int32_", 'Int32'),
Field("Int64_", 'Int64'),
Field("UUID_", 'UUID'),
Field("Date_", 'Date'),
Field("DateTime_", 'DateTime'),
Field("String_", 'String'),
Field("Float32_", 'Float32'),
Field("Float64_", 'Float64'),
],
"complex": [
Field("KeyField1", 'UInt64', is_key=True),
Field("KeyField2", 'String', is_key=True),
Field("UInt8_", 'UInt8'),
Field("UInt16_", 'UInt16'),
Field("UInt32_", 'UInt32'),
Field("UInt64_", 'UInt64'),
Field("Int8_", 'Int8'),
Field("Int16_", 'Int16'),
Field("Int32_", 'Int32'),
Field("Int64_", 'Int64'),
Field("UUID_", 'UUID'),
Field("Date_", 'Date'),
Field("DateTime_", 'DateTime'),
Field("String_", 'String'),
Field("Float32_", 'Float32'),
Field("Float64_", 'Float64'),
],
"ranged": [
Field("KeyField1", 'UInt64', is_key=True),
Field("KeyField2", 'Date', is_range_key=True),
Field("StartDate", 'Date', range_hash_type='min'),
Field("EndDate", 'Date', range_hash_type='max'),
Field("UInt8_", 'UInt8'),
Field("UInt16_", 'UInt16'),
Field("UInt32_", 'UInt32'),
Field("UInt64_", 'UInt64'),
Field("Int8_", 'Int8'),
Field("Int16_", 'Int16'),
Field("Int32_", 'Int32'),
Field("Int64_", 'Int64'),
Field("UUID_", 'UUID'),
Field("Date_", 'Date'),
Field("DateTime_", 'DateTime'),
Field("String_", 'String'),
Field("Float32_", 'Float32'),
Field("Float64_", 'Float64'),
]
}
LAYOUTS = [
Layout("cache"),
Layout("hashed"),
Layout("flat"),
Layout("complex_key_hashed"),
Layout("complex_key_cache"),
Layout("range_hashed")
]
SOURCES = [
#SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
]
DICTIONARIES = []
cluster = None
node = None
def setup_module(module):
global DICTIONARIES
global cluster
global node
dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries')
for f in os.listdir(dict_configs_path):
os.remove(os.path.join(dict_configs_path, f))
for layout in LAYOUTS:
for source in SOURCES:
structure = DictionaryStructure(layout, FIELDS[layout.layout_type])
dict_name = source.name + "_" + layout.name
dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name)
print dict_name
dictionary.generate_config()
DICTIONARIES.append(dictionary)
main_configs = []
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for dictionary in DICTIONARIES:
print "Preparing", dictionary.name
dictionary.prepare_source()
print "Prepared"
yield cluster
finally:
pass
cluster.shutdown()
def test_simple_dictionaries(started_cluster):
fields = FIELDS["simple"]
data = [
Row(fields, [1, 22, 333, 4444, 55555, -6, -77,
-888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4]),
]
simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"]
for dct in simple_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
queries_with_answers = []
for dct in simple_dicts:
for row in data:
for field in fields:
if not field.is_key:
queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name)))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
def test_complex_dictionaries(started_cluster):
fields = FIELDS["complex"]
data = [
Row(fields, [1, 'world', 22, 333, 4444, 55555, -6,
-77, -888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25',
'hello', 22.543, 3332154213.4]),
]
complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"]
for dct in complex_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
queries_with_answers = []
for dct in complex_dicts:
for row in data:
for field in fields:
if not field.is_key:
queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name)))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
def test_ranged_dictionaries(started_cluster):
fields = FIELDS["ranged"]
data = [
Row(fields, [1, '2019-02-10', '2019-02-01', '2019-02-28',
22, 333, 4444, 55555, -6, -77, -888, -999,
'550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello',
22.543, 3332154213.4]),
]
ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"]
for dct in ranged_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
queries_with_answers = []
for dct in ranged_dicts:
for row in data:
for field in fields:
if not field.is_key and not field.is_range:
queries_with_answers.append((dct.get_select_query(field, row), row.get_value_by_name(field.name)))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册