diff --git a/dbms/tests/integration/pytest.ini b/dbms/tests/integration/pytest.ini index e51d0efad3ddd1343c64fabe8c8100a4c8d79bdc..dc5bb603b636d7351b4d6282a897324bde935104 100644 --- a/dbms/tests/integration/pytest.ini +++ b/dbms/tests/integration/pytest.ini @@ -1,3 +1,3 @@ [pytest] -python_files = test.py +python_files = test*.py norecursedirs = _instances diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py index c468c2bfc678ded3817fd1ebf48a7d1b5283a166..05aa9bfa59d8d239ec38b00a44bf92be8795e037 100644 --- a/dbms/tests/integration/test_external_dictionaries/dictionary.py +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -1,4 +1,4 @@ -#-*- coding: utf-8 -*- +# -*- coding: utf-8 -*- import copy @@ -9,7 +9,7 @@ class Layout(object): 'cache': '128', 'complex_key_hashed': '', 'complex_key_cache': '128', - 'range_hashed': '' + 'range_hashed': '', } def __init__(self, name): @@ -18,13 +18,13 @@ class Layout(object): self.is_simple = False self.is_ranged = False if self.name.startswith('complex'): - self.layout_type = "complex" + self.layout_type = 'complex' self.is_complex = True - elif name.startswith("range"): - self.layout_type = "ranged" + elif name.startswith('range'): + self.layout_type = 'ranged' self.is_ranged = True else: - self.layout_type = "simple" + self.layout_type = 'simple' self.is_simple = True def get_str(self): @@ -33,8 +33,7 @@ class Layout(object): def get_key_block_name(self): if self.is_complex: return 'key' - else: - return 'id' + return 'id' class Row(object): @@ -90,13 +89,12 @@ class Field(object): class DictionaryStructure(object): - def __init__(self, layout, fields, is_kv=False): + def __init__(self, layout, fields): self.layout = layout self.keys = [] self.range_key = None self.ordinary_fields = [] self.range_fields = [] - self.is_kv = is_kv for field in fields: if field.is_key: @@ -121,14 +119,12 @@ class DictionaryStructure(object): fields_strs = [] for field in self.ordinary_fields: fields_strs.append(field.get_attribute_str()) - if self.is_kv: - break key_strs = [] if self.layout.is_complex: for key_field in self.keys: key_strs.append(key_field.get_attribute_str()) - else: # same for simple and ranged + else: # same for simple and ranged for key_field in self.keys: key_strs.append(key_field.get_simple_index_str()) @@ -288,13 +284,14 @@ class DictionaryStructure(object): class Dictionary(object): - def __init__(self, name, structure, source, config_path, table_name): + def __init__(self, name, structure, source, config_path, table_name, fields=None, values=None): self.name = name self.structure = copy.deepcopy(structure) self.source = copy.deepcopy(source) self.config_path = config_path self.table_name = table_name - self.is_kv = source.is_kv + self.fields = fields + self.values = values def generate_config(self): with open(self.config_path, 'w') as result: diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index 6830f9500c8b89b49753295ea166547fbf02476b..a22cc6e024f3fdf4e00bb3486c26cd910938b2ca 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -3,6 +3,7 @@ import warnings import pymysql.cursors import pymongo import redis +import aerospike from tzlocal import get_localzone import datetime import os @@ -12,7 +13,7 @@ import time class ExternalSource(object): def __init__(self, name, internal_hostname, internal_port, - docker_hostname, docker_port, user, password, is_kv): + docker_hostname, docker_port, user, password, storage_type=None): self.name = name self.internal_hostname = internal_hostname self.internal_port = int(internal_port) @@ -20,7 +21,7 @@ class ExternalSource(object): self.docker_port = int(docker_port) self.user = user self.password = password - self.is_kv = is_kv + self.storage_type = storage_type def get_source_str(self, table_name): raise NotImplementedError("Method {} is not implemented for {}".format( @@ -38,9 +39,6 @@ class ExternalSource(object): def compatible_with_layout(self, layout): return True - def prepare_value_for_type(self, field, value): - return value - class SourceMySQL(ExternalSource): TYPE_MAPPING = { @@ -388,10 +386,12 @@ class SourceRedis(ExternalSource): {host} {port} 0 + {storage_type} '''.format( host=self.docker_hostname, port=self.docker_port, + storage_type=self.storage_type, # simple or hash_map ) def prepare(self, structure, table_name, cluster): @@ -399,33 +399,96 @@ class SourceRedis(ExternalSource): self.prepared = True def load_data(self, data, table_name): - for row_num, row in enumerate(data): # FIXME: yield - self.client.execute_command("FLUSHDB") + self.client.flushdb() + for row in data: for cell_name, cell_value in row.data.items(): value_type = "$" if isinstance(cell_value, int): value_type = ":" else: cell_value = '"' + str(cell_value).replace(' ', '\s') + '"' - cmd = "SET " + "$" + cell_name + " " + value_type + str(cell_value) + cmd = "SET ${} {}{}".format(cell_name, value_type, cell_value) print(cmd) self.client.execute_command(cmd) - return + + def load_kv_data(self, values): + self.client.flushdb() + if len(values[0]) == 2: + self.client.mset({value[0]: value[1] for value in values}) + else: + for value in values: + self.client.hset(value[0], value[1], value[2]) def compatible_with_layout(self, layout): - if not layout.is_simple: - return False - return True + if layout.is_simple and self.storage_type == "simple" or layout.is_complex and self.storage_type == "simple": + return True + return False + + +class SourceAerospike(ExternalSource): + def __init__(self, name, internal_hostname, internal_port, + docker_hostname, docker_port, user, password, storage_type=None): + ExternalSource.__init__(self, name, internal_hostname, internal_port, + docker_hostname, docker_port, user, password, storage_type) + self.namespace = "test" + self.set = "test_set" - def prepare_value_for_type(self, field, value): - if field.field_type == "Date": - dt = dateutil.parser.parse(value) - return int(time.mktime(dt.timetuple()) // 86400) - if field.field_type == "DateTime": - dt = dateutil.parser.parse(value) - return int(time.mktime(dt.timetuple())) - if field.field_type == "Float32": - return str(value) - if field.field_type == "Float64": - return str(value) - return value + def get_source_str(self, table_name): + print("AEROSPIKE get source str") + return ''' + + {host} + {port} + + '''.format( + host=self.docker_hostname, + port=self.docker_port, + storage_type=self.storage_type, # simple or hash_map + ) + + def prepare(self, structure, table_name, cluster): + config = { + 'hosts': [ (self.internal_hostname, self.internal_port) ] + } + self.client = aerospike.client(config).connect() + self.prepared = True + print("PREPARED AEROSPIKE") + print(config) + + def compatible_with_layout(self, layout): + print("compatible AEROSPIKE") + return layout.is_simple + + def _flush_aerospike_db(self): + keys = [] + + def handle_record((key, metadata, record)): + print("Handle record {} {}".format(key, record)) + keys.append(key) + + def print_record((key, metadata, record)): + print("Print record {} {}".format(key, record)) + + scan = self.client.scan(self.namespace, self.set) + scan.foreach(handle_record) + + [self.client.remove(key) for key in keys] + + def load_kv_data(self, values): + self._flush_aerospike_db() + + print("Load KV Data Aerospike") + if len(values[0]) == 2: + for value in values: + key = (self.namespace, self.set, value[0]) + print(key) + self.client.put(key, {"bin_value": value[1]}, policy={"key": aerospike.POLICY_KEY_SEND}) + assert self.client.exists(key) + else: + assert("VALUES SIZE != 2") + + # print(values) + + def load_data(self, data, table_name): + print("Load Data Aerospike") + # print(data) diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index c42727c76a8de84069093d814e59125563305fd1..841a9124af0975fbe9600cdf5067f7f4215be69e 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -3,8 +3,8 @@ import os from helpers.cluster import ClickHouseCluster from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout -from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed, SourceMongo -from external_sources import SourceHTTP, SourceHTTPS, SourceRedis +from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed +from external_sources import SourceMongo, SourceHTTP, SourceHTTPS SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -78,16 +78,15 @@ LAYOUTS = [ ] SOURCES = [ - SourceRedis("Redis", "localhost", "6380", "redis1", "6379", "", "", True), - SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse", False), - SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse", False), - SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", "", False), - SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", "", False), - SourceFile("File", "localhost", "9000", "node", "9000", "", "", False), - SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", "", False), - SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", "", False), - SourceHTTP("SourceHTTP", "localhost", "9000", "clickhouse1", "9000", "", "", False), - SourceHTTPS("SourceHTTPS", "localhost", "9000", "clickhouse1", "9000", "", "", False), + SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), + SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), + SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""), + SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""), + SourceFile("File", "localhost", "9000", "node", "9000", "", ""), + SourceExecutableHashed("ExecutableHashed", "localhost", "9000", "node", "9000", "", ""), + SourceExecutableCache("ExecutableCache", "localhost", "9000", "node", "9000", "", ""), + SourceHTTP("SourceHTTP", "localhost", "9000", "clickhouse1", "9000", "", ""), + SourceHTTPS("SourceHTTPS", "localhost", "9000", "clickhouse1", "9000", "", ""), ] DICTIONARIES = [] @@ -95,6 +94,7 @@ DICTIONARIES = [] cluster = None node = None + def setup_module(module): global DICTIONARIES global cluster @@ -107,9 +107,9 @@ def setup_module(module): for layout in LAYOUTS: for source in SOURCES: if source.compatible_with_layout(layout): - structure = DictionaryStructure(layout, FIELDS[layout.layout_type], source.is_kv) + structure = DictionaryStructure(layout, FIELDS[layout.layout_type]) dict_name = source.name + "_" + layout.name - dict_path = os.path.join(dict_configs_path, dict_name + '.xml') # FIXME: single xml config for every column + dict_path = os.path.join(dict_configs_path, dict_name + '.xml') dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name) dictionary.generate_config() DICTIONARIES.append(dictionary) @@ -120,9 +120,10 @@ def setup_module(module): for fname in os.listdir(dict_configs_path): main_configs.append(os.path.join(dict_configs_path, fname)) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) - node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True) + node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) cluster.add_instance('clickhouse1') + @pytest.fixture(scope="module") def started_cluster(): try: @@ -137,39 +138,28 @@ def started_cluster(): finally: cluster.shutdown() -def prepare_row(dct, fields, values): - prepared_values = [] - for field, value in zip(fields, values): - prepared_values.append(dct.source.prepare_value_for_type(field, value)) - return Row(fields, prepared_values) - -def prepare_data(dct, fields, values_by_row): - data = [] - for row in values_by_row: - data.append(prepare_row(dct, fields, row)) - return data def test_simple_dictionaries(started_cluster): fields = FIELDS["simple"] - values_by_row = [ - [1, 22, 333, 4444, 55555, -6, -77, - -888, -999, '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0], - [2, 3, 4, 5, 6, -7, -8, - -9, -10, '550e8400-e29b-41d4-a716-446655440002', - '1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1], + data = [ + Row(fields, + [1, 22, 333, 4444, 55555, -6, -77, + -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0]), + Row(fields, + [2, 3, 4, 5, 6, -7, -8, + -9, -10, '550e8400-e29b-41d4-a716-446655440002', + '1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]), ] simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"] for dct in simple_dicts: - data = prepare_data(dct, fields, values_by_row) dct.load_data(data) node.query("system reload dictionaries") queries_with_answers = [] for dct in simple_dicts: - data = prepare_data(dct, fields, values_by_row) for row in data: for field in fields: if not field.is_key: @@ -181,8 +171,6 @@ def test_simple_dictionaries(started_cluster): for query in dct.get_select_get_or_default_queries(field, row): queries_with_answers.append((query, field.default_value_for_get)) - if dct.is_kv: - break for query in dct.get_hierarchical_queries(data[0]): queries_with_answers.append((query, [1])) @@ -201,29 +189,30 @@ def test_simple_dictionaries(started_cluster): answer = str(answer).replace(' ', '') assert node.query(query) == str(answer) + '\n' + def test_complex_dictionaries(started_cluster): fields = FIELDS["complex"] - values_by_row = [ - [1, 'world', 22, 333, 4444, 55555, -6, - -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', - 'hello', 22.543, 3332154213.4], - [2, 'qwerty2', 52, 2345, 6544, 9191991, -2, - -717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007', - '1975-09-28', '2000-02-28 23:33:24', - 'my', 255.543, 3332221.44], + data = [ + Row(fields, + [1, 'world', 22, 333, 4444, 55555, -6, + -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', + 'hello', 22.543, 3332154213.4]), + Row(fields, + [2, 'qwerty2', 52, 2345, 6544, 9191991, -2, + -717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007', + '1975-09-28', '2000-02-28 23:33:24', + 'my', 255.543, 3332221.44]), ] - complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex" and not d.is_kv] + complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"] for dct in complex_dicts: - data = prepare_data(dct, fields, values_by_row) dct.load_data(data) node.query("system reload dictionaries") queries_with_answers = [] for dct in complex_dicts: - data = prepare_data(dct, fields, values_by_row) for row in data: for field in fields: if not field.is_key: @@ -240,38 +229,37 @@ def test_complex_dictionaries(started_cluster): print query assert node.query(query) == str(answer) + '\n' + def test_ranged_dictionaries(started_cluster): fields = FIELDS["ranged"] - values_by_row = [ - [1, '2019-02-10', '2019-02-01', '2019-02-28', - 22, 333, 4444, 55555, -6, -77, -888, -999, - '550e8400-e29b-41d4-a716-446655440003', - '1973-06-28', '1985-02-28 23:43:25', 'hello', - 22.543, 3332154213.4], - [2, '2019-04-10', '2019-04-01', '2019-04-28', - 11, 3223, 41444, 52515, -65, -747, -8388, -9099, - '550e8400-e29b-41d4-a716-446655440004', - '1973-06-29', '2002-02-28 23:23:25', '!!!!', - 32.543, 3332543.4], + data = [ + Row(fields, + [1, '2019-02-10', '2019-02-01', '2019-02-28', + 22, 333, 4444, 55555, -6, -77, -888, -999, + '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', + 22.543, 3332154213.4]), + Row(fields, + [2, '2019-04-10', '2019-04-01', '2019-04-28', + 11, 3223, 41444, 52515, -65, -747, -8388, -9099, + '550e8400-e29b-41d4-a716-446655440004', + '1973-06-29', '2002-02-28 23:23:25', '!!!!', + 32.543, 3332543.4]), ] - ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged" and not d.is_kv] + ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"] for dct in ranged_dicts: - data = prepare_data(dct, fields, values_by_row) dct.load_data(data) node.query("system reload dictionaries") queries_with_answers = [] for dct in ranged_dicts: - data = prepare_data(dct, fields, values_by_row) for row in data: for field in fields: if not field.is_key and not field.is_range: for query in dct.get_select_get_queries(field, row): queries_with_answers.append((query, row.get_value_by_name(field.name))) - if dct.is_kv: - break for query, answer in queries_with_answers: print query diff --git a/dbms/tests/integration/test_external_dictionaries/test_kv.py b/dbms/tests/integration/test_external_dictionaries/test_kv.py new file mode 100644 index 0000000000000000000000000000000000000000..b085e89b7d926effa1c989681d20f4782a1179d4 --- /dev/null +++ b/dbms/tests/integration/test_external_dictionaries/test_kv.py @@ -0,0 +1,321 @@ +import os + +import pytest +from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout +from external_sources import SourceRedis, SourceAerospike + +from helpers.cluster import ClickHouseCluster + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + +FIELDS = { + "simple": [ + Field("KeyField", 'UInt64', is_key=True, default_value_for_get=9999999), + Field("UInt8_", 'UInt8', default_value_for_get=55), + Field("UInt16_", 'UInt16', default_value_for_get=66), + Field("UInt32_", 'UInt32', default_value_for_get=77), + Field("UInt64_", 'UInt64', default_value_for_get=88), + Field("Int8_", 'Int8', default_value_for_get=-55), + Field("Int16_", 'Int16', default_value_for_get=-66), + Field("Int32_", 'Int32', default_value_for_get=-77), + Field("Int64_", 'Int64', default_value_for_get=-88), + Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'), + Field("Date_", 'Date', default_value_for_get='2018-12-30'), + Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'), + Field("String_", 'String', default_value_for_get='hi'), + Field("Float32_", 'Float32', default_value_for_get=555.11), + Field("Float64_", 'Float64', default_value_for_get=777.11), + Field("ParentKeyField", "UInt64", default_value_for_get=444, hierarchical=True), + ], + "complex": [ + Field("KeyField1", 'UInt64', is_key=True, default_value_for_get=9999999), + Field("KeyField2", 'String', is_key=True, default_value_for_get='xxxxxxxxx'), + Field("UInt8_", 'UInt8', default_value_for_get=55), + Field("UInt16_", 'UInt16', default_value_for_get=66), + Field("UInt32_", 'UInt32', default_value_for_get=77), + Field("UInt64_", 'UInt64', default_value_for_get=88), + Field("Int8_", 'Int8', default_value_for_get=-55), + Field("Int16_", 'Int16', default_value_for_get=-66), + Field("Int32_", 'Int32', default_value_for_get=-77), + Field("Int64_", 'Int64', default_value_for_get=-88), + Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'), + Field("Date_", 'Date', default_value_for_get='2018-12-30'), + Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'), + Field("String_", 'String', default_value_for_get='hi'), + Field("Float32_", 'Float32', default_value_for_get=555.11), + Field("Float64_", 'Float64', default_value_for_get=777.11), + ], + "ranged": [ + Field("KeyField1", 'UInt64', is_key=True), + Field("KeyField2", 'Date', is_range_key=True), + Field("StartDate", 'Date', range_hash_type='min'), + Field("EndDate", 'Date', range_hash_type='max'), + Field("UInt8_", 'UInt8', default_value_for_get=55), + Field("UInt16_", 'UInt16', default_value_for_get=66), + Field("UInt32_", 'UInt32', default_value_for_get=77), + Field("UInt64_", 'UInt64', default_value_for_get=88), + Field("Int8_", 'Int8', default_value_for_get=-55), + Field("Int16_", 'Int16', default_value_for_get=-66), + Field("Int32_", 'Int32', default_value_for_get=-77), + Field("Int64_", 'Int64', default_value_for_get=-88), + Field("UUID_", 'UUID', default_value_for_get='550e8400-0000-0000-0000-000000000000'), + Field("Date_", 'Date', default_value_for_get='2018-12-30'), + Field("DateTime_", 'DateTime', default_value_for_get='2018-12-30 00:00:00'), + Field("String_", 'String', default_value_for_get='hi'), + Field("Float32_", 'Float32', default_value_for_get=555.11), + Field("Float64_", 'Float64', default_value_for_get=777.11), + ], +} + +VALUES = { + "simple": [ + [ + 1, 22, 333, 4444, 55555, -6, -77, + -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0, + ], + [ + 2, 3, 4, 5, 6, -7, -8, + -9, -10, '550e8400-e29b-41d4-a716-446655440002', + '1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1, + ], + ], + "complex": [ + [ + 1, 'world', 22, 333, 4444, 55555, -6, + -77, -888, -999, '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', + 'hello', 22.543, 3332154213.4, + ], + [ + 2, 'qwerty2', 52, 2345, 6544, 9191991, -2, + -717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007', + '1975-09-28', '2000-02-28 23:33:24', + 'my', 255.543, 3332221.44, + ], + ], + "ranged": [ + [ + 1, '2019-02-10', '2019-02-01', '2019-02-28', + 22, 333, 4444, 55555, -6, -77, -888, -999, + '550e8400-e29b-41d4-a716-446655440003', + '1973-06-28', '1985-02-28 23:43:25', 'hello', + 22.543, 3332154213.4, + ], + [ + 2, '2019-04-10', '2019-04-01', '2019-04-28', + 11, 3223, 41444, 52515, -65, -747, -8388, -9099, + '550e8400-e29b-41d4-a716-446655440004', + '1973-06-29', '2002-02-28 23:23:25', '!!!!', + 32.543, 3332543.4, + ], + ], +} + +LAYOUTS = [ + Layout("flat"), + Layout("hashed"), + Layout("cache"), + Layout("complex_key_hashed"), + Layout("complex_key_cache"), + Layout("range_hashed"), +] + +SOURCES = [ + SourceRedis("RedisSimple", "localhost", "6380", "redis1", "6379", "", "", storage_type="simple"), + # SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"), + # SourceAerospike("Aerospike", "localhost", "3000", "aerospike1", "3000", "", ""), +] + +DICTIONARIES = [] + +cluster = None +node = None + + +def setup_kv_dict(suffix, layout, fields, kv_source, dict_configs_path, values): + global DICTIONARIES + + structure = DictionaryStructure(layout, fields) + dict_name = "{}_{}_{}".format(kv_source.name, layout.name, suffix) + dict_path = os.path.join(dict_configs_path, dict_name + '.xml') + dictionary = Dictionary(dict_name, structure, kv_source, dict_path, "table_" + dict_name, fields, values) + dictionary.generate_config() + DICTIONARIES.append(dictionary) + + +def setup_module(module): + global DICTIONARIES + global cluster + global node + + dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries') + for f in os.listdir(dict_configs_path): + os.remove(os.path.join(dict_configs_path, f)) + + for layout in LAYOUTS: + for source in SOURCES: + if source.compatible_with_layout(layout): + if layout.layout_type == "simple": + fields_len = len(FIELDS["simple"]) + for i in range(fields_len - 1): + local_fields = [FIELDS["simple"][0], FIELDS["simple"][i + 1]] + local_values = [[value[0], value[i + 1]] for value in VALUES["simple"]] + setup_kv_dict(i + 1, layout, local_fields, source, dict_configs_path, local_values) + elif layout.layout_type == "complex": + fields_len = len(FIELDS["complex"]) + for i in range(fields_len - 2): + local_fields = [FIELDS['complex'][1], FIELDS['complex'][i + 2]] + local_values = [[value[1], value[i + 2]] for value in VALUES["complex"]] + setup_kv_dict(i + 2, layout, local_fields, source, dict_configs_path, local_values) + elif layout.layout_type == "ranged": + fields_len = len(FIELDS["ranged"]) + local_fields = FIELDS["ranged"][0:5] + local_values = VALUES["ranged"][0:5] + for i in range(fields_len - 4): + local_fields[4] = FIELDS["ranged"][i + 4] + for j, value in enumerate(VALUES["ranged"]): + local_values[j][4] = value[i + 4] + setup_kv_dict(i + 2, layout, local_fields, source, dict_configs_path, local_values) + else: + print "Source", source.name, "incompatible with layout", layout.name + + main_configs = [] + for fname in os.listdir(dict_configs_path): + main_configs.append(os.path.join(dict_configs_path, fname)) + cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) + # TODO: add your kv source flag below + node = cluster.add_instance('node', main_configs=main_configs, with_redis=True) + cluster.add_instance('clickhouse1') + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + for dictionary in DICTIONARIES: + print "Preparing", dictionary.name + dictionary.prepare_source(cluster) + print "Prepared" + + yield cluster + + finally: + cluster.shutdown() + + +def prepare_data(fields, values_by_row): + return [Row(fields, values) for values in values_by_row] + + +def test_simple_kv_dictionaries(started_cluster): + simple_kv_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"] + + for dct in simple_kv_dicts: + queries_with_answers = [] + fields = dct.fields + print("FIELDS AND VALUES FOR " + dct.name) + print(fields) + print(dct.values) + data = prepare_data(fields, dct.values) + dct.source.load_kv_data(dct.values) + + try: + node.query("system reload dictionary '{}'".format(dct.name)) + except Exception: + print(dct.name) + raise + + for row in data: + for field in fields: + if not field.is_key: + for query in dct.get_select_get_queries(field, row): + queries_with_answers.append((query, row.get_value_by_name(field.name))) + + for query in dct.get_select_has_queries(field, row): + queries_with_answers.append((query, 1)) + + for query in dct.get_select_get_or_default_queries(field, row): + queries_with_answers.append((query, field.default_value_for_get)) + if dct.fields[1].hierarchical: + for query in dct.get_hierarchical_queries(data[0]): + queries_with_answers.append((query, [1])) + + for query in dct.get_hierarchical_queries(data[1]): + queries_with_answers.append((query, [2, 1])) + + for query in dct.get_is_in_queries(data[0], data[1]): + queries_with_answers.append((query, 0)) + + for query in dct.get_is_in_queries(data[1], data[0]): + queries_with_answers.append((query, 1)) + + for query, answer in queries_with_answers: + if isinstance(answer, list): + answer = str(answer).replace(' ', '') + print query + assert node.query(query) == str(answer) + '\n', query + + +def test_complex_dictionaries(started_cluster): + complex_kv_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"] + + for dct in complex_kv_dicts: + queries_with_answers = [] + fields = dct.fields + print("FIELDS AND VALUES FOR " + dct.name) + print(fields) + print(dct.values) + data = prepare_data(fields, dct.values) + dct.source.load_kv_data(dct.values) + + try: + node.query("system reload dictionary '{}'".format(dct.name)) + except Exception: + print(dct.name) + raise + + for row in data: + for field in fields: + if not field.is_key: + for query in dct.get_select_get_queries(field, row): + queries_with_answers.append((query, row.get_value_by_name(field.name))) + + for query in dct.get_select_has_queries(field, row): + queries_with_answers.append((query, 1)) + + for query in dct.get_select_get_or_default_queries(field, row): + queries_with_answers.append((query, field.default_value_for_get)) + + for query, answer in queries_with_answers: + print query + assert node.query(query) == str(answer) + '\n' + + +def xtest_ranged_dictionaries(started_cluster): + complex_kv_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"] + + for dct in complex_kv_dicts: + queries_with_answers = [] + fields = dct.fields + print("FIELDS AND VALUES FOR " + dct.name) + print(fields) + print(dct.values) + data = prepare_data(fields, dct.values) + dct.source.load_kv_data(dct.values) + + try: + node.query("system reload dictionary '{}'".format(dct.name)) + except Exception: + print(dct.name) + raise + + for row in data: + for field in fields: + if not field.is_key and not field.is_range: + for query in dct.get_select_get_queries(field, row): + queries_with_answers.append((query, row.get_value_by_name(field.name))) + + for query, answer in queries_with_answers: + print query + assert node.query(query) == str(answer) + '\n'