From 9d91dbb7c591d88d4d504145725279f6a913381d Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 25 Feb 2019 13:45:22 +0300 Subject: [PATCH] Finally add mongo source --- .../Dictionaries/MongoDBDictionarySource.cpp | 3 +- dbms/tests/integration/README.md | 2 +- dbms/tests/integration/helpers/cluster.py | 32 ++++++- .../helpers/docker_compose_mongo.yml | 2 +- dbms/tests/integration/image/Dockerfile | 4 +- .../test_external_dictionaries/dictionary.py | 2 + .../external_sources.py | 88 +++++++++++-------- .../test_external_dictionaries/test.py | 6 +- 8 files changed, 93 insertions(+), 46 deletions(-) diff --git a/dbms/src/Dictionaries/MongoDBDictionarySource.cpp b/dbms/src/Dictionaries/MongoDBDictionarySource.cpp index ec7ddc033f..73ffd4727f 100644 --- a/dbms/src/Dictionaries/MongoDBDictionarySource.cpp +++ b/dbms/src/Dictionaries/MongoDBDictionarySource.cpp @@ -192,7 +192,8 @@ MongoDBDictionarySource::MongoDBDictionarySource( { # if POCO_VERSION >= 0x01070800 Poco::MongoDB::Database poco_db(db); - poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method); + if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method)) + throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE); # else authenticate(*connection, db, user, password); # endif diff --git a/dbms/tests/integration/README.md b/dbms/tests/integration/README.md index a1482a7c7c..6a707f1ea0 100644 --- a/dbms/tests/integration/README.md +++ b/dbms/tests/integration/README.md @@ -14,7 +14,7 @@ Don't use Docker from your system repository. * [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip` * [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest` -* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2` +* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal` (highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2` diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index c6b1863432..8bd930ac1b 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -16,6 +16,7 @@ from kazoo.exceptions import KazooException import psycopg2 import requests import base64 +import pymongo import docker from docker.errors import ContainerError @@ -99,6 +100,7 @@ class ClickHouseCluster: self.with_kafka = False self.with_odbc_drivers = False self.with_hdfs = False + self.with_mongo = False self.docker_client = None self.is_up = False @@ -110,7 +112,7 @@ class ClickHouseCluster: cmd += " client" return cmd - def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): + def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): """Add an instance to the cluster. name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. @@ -128,7 +130,7 @@ class ClickHouseCluster: instance = ClickHouseInstance( self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper, - self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path, + self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path, self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname, env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address) @@ -177,6 +179,11 @@ class ClickHouseCluster: self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')] + if with_mongo and not self.with_mongo: + self.with_mongo = True + self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]) + self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', + self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')] return instance @@ -249,6 +256,20 @@ class ClickHouseCluster: raise Exception("Can't wait HDFS to start") + def wait_mongo_to_start(self, timeout=30): + connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( + host='localhost', port='27018', user='root', password='clickhouse') + connection = pymongo.MongoClient(connection_str) + start = time.time() + while time.time() - start < timeout: + try: + connection.database_names() + print "Connected to Mongo dbs:", connection.database_names() + return + except Exception as ex: + print "Can't connect to Mongo " + str(ex) + time.sleep(1) + def start(self, destroy_dirs=True): if self.is_up: return @@ -291,6 +312,10 @@ class ClickHouseCluster: subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate']) self.wait_hdfs_to_start(120) + if self.with_mongo and self.base_mongo_cmd: + subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate']) + self.wait_mongo_to_start(30) + subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate']) start_deadline = time.time() + 20.0 # seconds @@ -389,7 +414,7 @@ class ClickHouseInstance: def __init__( self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, - with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path, + with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None): @@ -413,6 +438,7 @@ class ClickHouseInstance: self.with_mysql = with_mysql self.with_kafka = with_kafka + self.with_mongo = with_mongo self.path = p.join(self.cluster.instances_dir, name) self.docker_compose_path = p.join(self.path, 'docker_compose.yml') diff --git a/dbms/tests/integration/helpers/docker_compose_mongo.yml b/dbms/tests/integration/helpers/docker_compose_mongo.yml index f9d3b15ba0..a593c3e123 100644 --- a/dbms/tests/integration/helpers/docker_compose_mongo.yml +++ b/dbms/tests/integration/helpers/docker_compose_mongo.yml @@ -1,7 +1,7 @@ version: '2.2' services: mongo1: - image: mongo + image: mongo:3.6 restart: always environment: MONGO_INITDB_ROOT_USERNAME: root diff --git a/dbms/tests/integration/image/Dockerfile b/dbms/tests/integration/image/Dockerfile index 118968bd74..6ee674448a 100644 --- a/dbms/tests/integration/image/Dockerfile +++ b/dbms/tests/integration/image/Dockerfile @@ -24,7 +24,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes - ENV TZ=Europe/Moscow RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 +RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal ENV DOCKER_CHANNEL stable ENV DOCKER_VERSION 17.09.1-ce @@ -61,4 +61,4 @@ RUN set -x \ VOLUME /var/lib/docker EXPOSE 2375 ENTRYPOINT ["dockerd-entrypoint.sh"] -CMD [] \ No newline at end of file +CMD [] diff --git a/dbms/tests/integration/test_external_dictionaries/dictionary.py b/dbms/tests/integration/test_external_dictionaries/dictionary.py index d67fc1d92a..bdddc7a960 100644 --- a/dbms/tests/integration/test_external_dictionaries/dictionary.py +++ b/dbms/tests/integration/test_external_dictionaries/dictionary.py @@ -157,6 +157,8 @@ class DictionaryStructure(object): fields_strs.append(field.name) return fields_strs + def get_all_fields(self): + return self.keys + self.range_fields + self.ordinary_fields def _get_dict_get_common_expression(self, dict_name, field, row, or_default, with_type, has): if field in self.keys: diff --git a/dbms/tests/integration/test_external_dictionaries/external_sources.py b/dbms/tests/integration/test_external_dictionaries/external_sources.py index a6213fd14e..71dc05ca78 100644 --- a/dbms/tests/integration/test_external_dictionaries/external_sources.py +++ b/dbms/tests/integration/test_external_dictionaries/external_sources.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- import warnings import pymysql.cursors -#import pymongo +import pymongo +from tzlocal import get_localzone +import datetime import os @@ -50,6 +52,7 @@ class SourceMySQL(ExternalSource): 'Float32': 'float', 'Float64': 'double' } + def create_mysql_conn(self): self.connection = pymysql.connect( user=self.user, @@ -123,40 +126,55 @@ class SourceMySQL(ExternalSource): self.execute_mysql_query(query) -#class SourceMongo(ExternalSource): -# -# def get_source_str(self, table_name): -# return ''' -# -# {host} -# {port} -# {user} -# {password} -# test -# {tbl} -# -# '''.format( -# host=self.docker_hostname, -# port=self.docker_port, -# user=self.user, -# password=self.password, -# tbl=table_name, -# ) -# -# def prepare(self, structure, table_name, cluster): -# connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( -# host=self.internal_hostname, port=self.internal_port, -# user=self.user, password=self.password) -# self.connection = pymongo.MongoClient(connection_str) -# self.connection.create -# self.structure = structure -# self.db = self.connection['test'] -# self.prepared = True -# -# def load_data(self, data, table_name): -# tbl = self.db[table_name] -# to_insert = [dict(row.data) for row in data] -# result = tbl.insert_many(to_insert) +class SourceMongo(ExternalSource): + + def get_source_str(self, table_name): + return ''' + + {host} + {port} + {user} + {password} + test + {tbl} + + '''.format( + host=self.docker_hostname, + port=self.docker_port, + user=self.user, + password=self.password, + tbl=table_name, + ) + + def prepare(self, structure, table_name, cluster): + connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format( + host=self.internal_hostname, port=self.internal_port, + user=self.user, password=self.password) + self.connection = pymongo.MongoClient(connection_str) + self.converters = {} + for field in structure.get_all_fields(): + if field.field_type == "Date": + self.converters[field.name] = lambda x: datetime.datetime.strptime(x, "%Y-%m-%d") + elif field.field_type == "DateTime": + self.converters[field.name] = lambda x: get_localzone().localize(datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S")) + else: + self.converters[field.name] = lambda x: x + + self.db = self.connection['test'] + self.db.add_user(self.user, self.password) + self.prepared = True + + def load_data(self, data, table_name): + tbl = self.db[table_name] + + to_insert = [] + for row in data: + row_dict = {} + for cell_name, cell_value in row.data.items(): + row_dict[cell_name] = self.converters[cell_name](cell_value) + to_insert.append(row_dict) + + result = tbl.insert_many(to_insert) class SourceClickHouse(ExternalSource): diff --git a/dbms/tests/integration/test_external_dictionaries/test.py b/dbms/tests/integration/test_external_dictionaries/test.py index 64006d22f7..7878620a65 100644 --- a/dbms/tests/integration/test_external_dictionaries/test.py +++ b/dbms/tests/integration/test_external_dictionaries/test.py @@ -4,7 +4,7 @@ import time from helpers.cluster import ClickHouseCluster from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout -from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed +from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed, SourceMongo from external_sources import SourceHTTP, SourceHTTPS SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -80,7 +80,7 @@ LAYOUTS = [ SOURCES = [ # some troubles with that dictionary - #SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), + SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""), SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""), @@ -121,7 +121,7 @@ def setup_module(module): for fname in os.listdir(dict_configs_path): main_configs.append(os.path.join(dict_configs_path, fname)) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) - node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True) + node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True) cluster.add_instance('clickhouse1') @pytest.fixture(scope="module") -- GitLab