提交 9d91dbb7 编写于 作者: A alesapin

Finally add mongo source

上级 3515d9bb
......@@ -192,7 +192,8 @@ MongoDBDictionarySource::MongoDBDictionarySource(
{
# if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(db);
poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method);
if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
# else
authenticate(*connection, db, user, password);
# endif
......
......@@ -14,7 +14,7 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2`
......
......@@ -16,6 +16,7 @@ from kazoo.exceptions import KazooException
import psycopg2
import requests
import base64
import pymongo
import docker
from docker.errors import ContainerError
......@@ -99,6 +100,7 @@ class ClickHouseCluster:
self.with_kafka = False
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_mongo = False
self.docker_client = None
self.is_up = False
......@@ -110,7 +112,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
......@@ -128,7 +130,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address)
......@@ -177,6 +179,11 @@ class ClickHouseCluster:
self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
if with_mongo and not self.with_mongo:
self.with_mongo = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
return instance
......@@ -249,6 +256,20 @@ class ClickHouseCluster:
raise Exception("Can't wait HDFS to start")
def wait_mongo_to_start(self, timeout=30):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host='localhost', port='27018', user='root', password='clickhouse')
connection = pymongo.MongoClient(connection_str)
start = time.time()
while time.time() - start < timeout:
try:
connection.database_names()
print "Connected to Mongo dbs:", connection.database_names()
return
except Exception as ex:
print "Can't connect to Mongo " + str(ex)
time.sleep(1)
def start(self, destroy_dirs=True):
if self.is_up:
return
......@@ -291,6 +312,10 @@ class ClickHouseCluster:
subprocess_check_call(self.base_hdfs_cmd + ['up', '-d', '--force-recreate'])
self.wait_hdfs_to_start(120)
if self.with_mongo and self.base_mongo_cmd:
subprocess_check_call(self.base_mongo_cmd + ['up', '-d', '--force-recreate'])
self.wait_mongo_to_start(30)
subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds
......@@ -389,7 +414,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None):
......@@ -413,6 +438,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
self.with_mongo = with_mongo
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
......
version: '2.2'
services:
mongo1:
image: mongo
image: mongo:3.6
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
......
......@@ -24,7 +24,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes -
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal
ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 17.09.1-ce
......@@ -61,4 +61,4 @@ RUN set -x \
VOLUME /var/lib/docker
EXPOSE 2375
ENTRYPOINT ["dockerd-entrypoint.sh"]
CMD []
\ No newline at end of file
CMD []
......@@ -157,6 +157,8 @@ class DictionaryStructure(object):
fields_strs.append(field.name)
return fields_strs
def get_all_fields(self):
return self.keys + self.range_fields + self.ordinary_fields
def _get_dict_get_common_expression(self, dict_name, field, row, or_default, with_type, has):
if field in self.keys:
......
# -*- coding: utf-8 -*-
import warnings
import pymysql.cursors
#import pymongo
import pymongo
from tzlocal import get_localzone
import datetime
import os
......@@ -50,6 +52,7 @@ class SourceMySQL(ExternalSource):
'Float32': 'float',
'Float64': 'double'
}
def create_mysql_conn(self):
self.connection = pymysql.connect(
user=self.user,
......@@ -123,40 +126,55 @@ class SourceMySQL(ExternalSource):
self.execute_mysql_query(query)
#class SourceMongo(ExternalSource):
#
# def get_source_str(self, table_name):
# return '''
# <mongodb>
# <host>{host}</host>
# <port>{port}</port>
# <user>{user}</user>
# <password>{password}</password>
# <db>test</db>
# <collection>{tbl}</collection>
# </mongodb>
# '''.format(
# host=self.docker_hostname,
# port=self.docker_port,
# user=self.user,
# password=self.password,
# tbl=table_name,
# )
#
# def prepare(self, structure, table_name, cluster):
# connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
# host=self.internal_hostname, port=self.internal_port,
# user=self.user, password=self.password)
# self.connection = pymongo.MongoClient(connection_str)
# self.connection.create
# self.structure = structure
# self.db = self.connection['test']
# self.prepared = True
#
# def load_data(self, data, table_name):
# tbl = self.db[table_name]
# to_insert = [dict(row.data) for row in data]
# result = tbl.insert_many(to_insert)
class SourceMongo(ExternalSource):
def get_source_str(self, table_name):
return '''
<mongodb>
<host>{host}</host>
<port>{port}</port>
<user>{user}</user>
<password>{password}</password>
<db>test</db>
<collection>{tbl}</collection>
</mongodb>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
user=self.user,
password=self.password,
tbl=table_name,
)
def prepare(self, structure, table_name, cluster):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host=self.internal_hostname, port=self.internal_port,
user=self.user, password=self.password)
self.connection = pymongo.MongoClient(connection_str)
self.converters = {}
for field in structure.get_all_fields():
if field.field_type == "Date":
self.converters[field.name] = lambda x: datetime.datetime.strptime(x, "%Y-%m-%d")
elif field.field_type == "DateTime":
self.converters[field.name] = lambda x: get_localzone().localize(datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S"))
else:
self.converters[field.name] = lambda x: x
self.db = self.connection['test']
self.db.add_user(self.user, self.password)
self.prepared = True
def load_data(self, data, table_name):
tbl = self.db[table_name]
to_insert = []
for row in data:
row_dict = {}
for cell_name, cell_value in row.data.items():
row_dict[cell_name] = self.converters[cell_name](cell_value)
to_insert.append(row_dict)
result = tbl.insert_many(to_insert)
class SourceClickHouse(ExternalSource):
......
......@@ -4,7 +4,7 @@ import time
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed, SourceMongo
from external_sources import SourceHTTP, SourceHTTPS
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
......@@ -80,7 +80,7 @@ LAYOUTS = [
SOURCES = [
# some troubles with that dictionary
#SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
SourceClickHouse("RemoteClickHouse", "localhost", "9000", "clickhouse1", "9000", "default", ""),
SourceClickHouse("LocalClickHouse", "localhost", "9000", "node", "9000", "default", ""),
......@@ -121,7 +121,7 @@ def setup_module(module):
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True)
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True)
cluster.add_instance('clickhouse1')
@pytest.fixture(scope="module")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册