cluster.py 41.1 KB
Newer Older
1 2 3
import base64
import distutils.dir_util
import errno
4 5
import os
import os.path as p
6
import pwd
7 8 9
import re
import shutil
import socket
10
import subprocess
11
import time
12
import urllib
13
import xml.dom.minidom
14 15
import logging
import docker
16
import psycopg2
A
alesapin 已提交
17
import pymongo
18 19 20 21 22
import pymysql
from dicttoxml import dicttoxml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from minio import Minio
23

24
from .client import Client
25
from .hdfs_api import HDFSApi
26 27

HELPERS_DIR = p.dirname(__file__)
28
DEFAULT_ENV_NAME = 'env_file'
29

30 31
SANITIZER_SIGN = "=================="

32

33 34 35 36 37 38 39
def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
    full_path = os.path.join(path, fname)
    with open(full_path, 'w') as f:
        for var, value in variables.items():
            f.write("=".join([var, value]) + "\n")
    return full_path

40

P
proller 已提交
41 42 43 44 45
def subprocess_check_call(args):
    # Uncomment for debugging
    # print('run:', ' ' . join(args))
    subprocess.check_call(args)

46

P
proller 已提交
47 48 49 50 51
def subprocess_call(args):
    # Uncomment for debugging
    # print('run:', ' ' . join(args))
    subprocess.call(args)

52

53 54 55 56 57 58 59 60 61 62 63
def get_odbc_bridge_path():
    path = os.environ.get('CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH')
    if path is None:
        server_path = os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH')
        if server_path is not None:
            return os.path.join(os.path.dirname(server_path), 'clickhouse-odbc-bridge')
        else:
            return '/usr/bin/clickhouse-odbc-bridge'
    return path


64 65 66 67 68 69 70 71 72
class ClickHouseCluster:
    """ClickHouse cluster with several instances and (possibly) ZooKeeper.

    Add instances with several calls to add_instance(), then start them with the start() call.

    Directories for instances are created in the directory of base_path. After cluster is started,
    these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
    """

73
    def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
74
                 odbc_bridge_bin_path=None, zookeeper_config_path=None, custom_dockerd_host=None):
75
        self.base_dir = p.dirname(base_path)
76
        self.name = name if name is not None else ''
77

78 79 80 81
        self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR',
                                                                   '/etc/clickhouse-server/')
        self.server_bin_path = p.realpath(
            server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse'))
82
        self.odbc_bridge_bin_path = p.realpath(odbc_bridge_bin_path or get_odbc_bridge_path())
83 84 85 86
        self.client_bin_path = p.realpath(
            client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client'))
        self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(
            HELPERS_DIR, 'zookeeper_config.xml')
87

88
        self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
89 90
        # docker-compose removes everything non-alphanumeric from project names so we do it too.
        self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
91
        self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
92
        self.docker_logs_path = p.join(self.instances_dir, 'docker.log')
93

94
        custom_dockerd_host = custom_dockerd_host or os.environ.get('CLICKHOUSE_TESTS_DOCKERD_HOST')
95
        self.docker_api_version = os.environ.get("DOCKER_API_VERSION")
96 97 98 99 100 101

        self.base_cmd = ['docker-compose']
        if custom_dockerd_host:
            self.base_cmd += ['--host', custom_dockerd_host]

        self.base_cmd += ['--project-directory', self.base_dir, '--project-name', self.project_name]
102
        self.base_zookeeper_cmd = None
S
sundy-li 已提交
103
        self.base_mysql_cmd = []
104
        self.base_kafka_cmd = []
105
        self.pre_zookeeper_commands = []
106 107
        self.instances = {}
        self.with_zookeeper = False
S
sundy-li 已提交
108
        self.with_mysql = False
109
        self.with_postgres = False
110
        self.with_kafka = False
A
alesapin 已提交
111
        self.with_odbc_drivers = False
112
        self.with_hdfs = False
A
alesapin 已提交
113
        self.with_mongo = False
114
        self.with_net_trics = False
A
alesapin 已提交
115
        self.with_redis = False
116

117 118 119 120 121 122 123 124
        self.with_minio = False
        self.minio_host = "minio1"
        self.minio_bucket = "root"
        self.minio_port = 9001
        self.minio_client = None  # type: Minio
        self.minio_redirect_host = "redirect"
        self.minio_redirect_port = 80

125
        self.docker_client = None
126 127
        self.is_up = False

A
alesapin 已提交
128 129 130 131 132 133
    def get_client_cmd(self):
        cmd = self.client_bin_path
        if p.basename(cmd) == 'clickhouse':
            cmd += " client"
        return cmd

134 135 136 137 138 139
    def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={},
                     with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
                     with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
                     with_redis=False, with_minio=False,
                     hostname=None, env_variables={}, image="yandex/clickhouse-integration-test",
                     stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=[]):
140 141 142
        """Add an instance to the cluster.

        name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
143 144 145
        config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
        main_configs - a list of config files that will be added to config.d/ directory
        user_configs - a list of config files that will be added to users.d/ directory
146 147 148 149
        with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
        """

        if self.is_up:
150
            raise Exception("Can\'t add instance %s: cluster is already up!" % name)
151 152

        if name in self.instances:
153
            raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
154

155
        instance = ClickHouseInstance(
156
            self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
157 158
            self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
            self.base_configs_dir, self.server_bin_path,
159
            self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
160 161
            env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
            ipv6_address=ipv6_address,
162
            with_installed_binary=with_installed_binary, tmpfs=tmpfs)
163

164
        self.instances[name] = instance
A
alesapin 已提交
165
        if ipv4_address is not None or ipv6_address is not None:
166
            self.with_net_trics = True
A
alesapin 已提交
167 168
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])

169
        self.base_cmd.extend(['--file', instance.docker_compose_path])
A
alesapin 已提交
170

171
        cmds = []
172 173 174
        if with_zookeeper and not self.with_zookeeper:
            self.with_zookeeper = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
175
            self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
176
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
177
            cmds.append(self.base_zookeeper_cmd)
178

S
sundy-li 已提交
179 180 181 182
        if with_mysql and not self.with_mysql:
            self.with_mysql = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
            self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
183
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
184

185 186
            cmds.append(self.base_mysql_cmd)

187 188 189 190
        if with_postgres and not self.with_postgres:
            self.with_postgres = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
            self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
191
                                      self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
192
            cmds.append(self.base_postgres_cmd)
193

A
alesapin 已提交
194 195 196 197 198 199 200
        if with_odbc_drivers and not self.with_odbc_drivers:
            self.with_odbc_drivers = True
            if not self.with_mysql:
                self.with_mysql = True
                self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
                self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
201 202
                cmds.append(self.base_mysql_cmd)

203 204 205 206
            if not self.with_postgres:
                self.with_postgres = True
                self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
                self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
207 208
                                          self.project_name, '--file',
                                          p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
209
                cmds.append(self.base_postgres_cmd)
210

211 212 213 214
        if with_kafka and not self.with_kafka:
            self.with_kafka = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')])
            self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
215
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]
216
            cmds.append(self.base_kafka_cmd)
217

218 219 220 221
        if with_hdfs and not self.with_hdfs:
            self.with_hdfs = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')])
            self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
222
                                  self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
223
            cmds.append(self.base_hdfs_cmd)
224

A
alesapin 已提交
225 226 227 228
        if with_mongo and not self.with_mongo:
            self.with_mongo = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
            self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
229
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
230 231 232 233 234
            cmds.append(self.base_mongo_cmd)

        if self.with_net_trics:
            for cmd in cmds:
                cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
235

A
alesapin 已提交
236 237 238 239
        if with_redis and not self.with_redis:
            self.with_redis = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_redis.yml')])
            self.base_redis_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
240
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_redis.yml')]
A
alesapin 已提交
241

242 243 244 245 246 247
        if with_minio and not self.with_minio:
            self.with_minio = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_minio.yml')])
            self.base_minio_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_minio.yml')]
            cmds.append(self.base_minio_cmd)
A
alesapin 已提交
248

249 250
        return instance

251 252 253 254
    def get_instance_docker_id(self, instance_name):
        # According to how docker-compose names containers.
        return self.project_name + '_' + instance_name + '_1'

A
alesapin 已提交
255 256 257 258 259 260 261 262 263 264
    def _replace(self, path, what, to):
        with open(path, 'r') as p:
            data = p.read()
        data = data.replace(what, to)
        with open(path, 'w') as p:
            p.write(data)

    def restart_instance_with_ip_change(self, node, new_ip):
        if '::' in new_ip:
            if node.ipv6_address is None:
V
Vladimir Chebotarev 已提交
265
                raise Exception("You should specity ipv6_address in add_node method")
A
alesapin 已提交
266 267 268 269
            self._replace(node.docker_compose_path, node.ipv6_address, new_ip)
            node.ipv6_address = new_ip
        else:
            if node.ipv4_address is None:
V
Vladimir Chebotarev 已提交
270
                raise Exception("You should specity ipv4_address in add_node method")
A
alesapin 已提交
271 272 273 274 275 276 277
            self._replace(node.docker_compose_path, node.ipv4_address, new_ip)
            node.ipv4_address = new_ip
        subprocess.check_call(self.base_cmd + ["stop", node.name])
        subprocess.check_call(self.base_cmd + ["rm", "--force", "--stop", node.name])
        subprocess.check_call(self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name])
        node.ip_address = self.get_instance_ip(node.name)
        node.client = Client(node.ip_address, command=self.client_bin_path)
278
        start_deadline = time.time() + 20.0  # seconds
A
alesapin 已提交
279 280
        node.wait_for_start(start_deadline)
        return node
281

282 283 284 285 286
    def get_instance_ip(self, instance_name):
        docker_id = self.get_instance_docker_id(instance_name)
        handle = self.docker_client.containers.get(docker_id)
        return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']

A
alesapin 已提交
287 288 289 290 291 292 293 294
    def wait_mysql_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
                conn.close()
                print "Mysql Started"
                return
A
alesapin 已提交
295
            except Exception as ex:
296
                print "Can't connect to MySQL " + str(ex)
A
alesapin 已提交
297 298 299 300
                time.sleep(0.5)

        raise Exception("Cannot wait MySQL container")

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
    def wait_postgres_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
                conn = psycopg2.connect(conn_string)
                conn.close()
                print "Postgres Started"
                return
            except Exception as ex:
                print "Can't connect to Postgres " + str(ex)
                time.sleep(0.5)

        raise Exception("Cannot wait Postgres container")

316 317 318 319 320 321 322
    def wait_zookeeper_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                for instance in ['zoo1', 'zoo2', 'zoo3']:
                    conn = self.get_kazoo_client(instance)
                    conn.get_children('/')
323
                print "All instances of ZooKeeper started"
324
                return
A
alesapin 已提交
325
            except Exception as ex:
326
                print "Can't connect to ZooKeeper " + str(ex)
327 328 329
                time.sleep(0.5)

        raise Exception("Cannot wait ZooKeeper container")
330

331 332 333 334 335 336 337 338 339 340 341 342 343 344
    def wait_hdfs_to_start(self, timeout=60):
        hdfs_api = HDFSApi("root")
        start = time.time()
        while time.time() - start < timeout:
            try:
                hdfs_api.write_data("/somefilewithrandomname222", "1")
                print "Connected to HDFS and SafeMode disabled! "
                return
            except Exception as ex:
                print "Can't connect to HDFS " + str(ex)
                time.sleep(1)

        raise Exception("Can't wait HDFS to start")

A
alesapin 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358
    def wait_mongo_to_start(self, timeout=30):
        connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
            host='localhost', port='27018', user='root', password='clickhouse')
        connection = pymongo.MongoClient(connection_str)
        start = time.time()
        while time.time() - start < timeout:
            try:
                connection.database_names()
                print "Connected to Mongo dbs:", connection.database_names()
                return
            except Exception as ex:
                print "Can't connect to Mongo " + str(ex)
                time.sleep(1)

359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
    def wait_minio_to_start(self, timeout=10):
        minio_client = Minio('localhost:9001',
                             access_key='minio',
                             secret_key='minio123',
                             secure=False)
        start = time.time()
        while time.time() - start < timeout:
            try:
                buckets = minio_client.list_buckets()
                self.minio_client = minio_client
                logging.info("Connected to Minio %s", buckets)
                return
            except Exception as ex:
                logging.warning("Can't connect to Minio: %s", str(ex))
                time.sleep(1)

375 376 377 378
    def start(self, destroy_dirs=True):
        if self.is_up:
            return

379
        # Just in case kill unstopped containers from previous launch
380
        try:
381 382
            logging.info("Trying to kill unstopped containers...")

P
proller 已提交
383 384
            if not subprocess_call(['docker-compose', 'kill']):
                subprocess_call(['docker-compose', 'down', '--volumes'])
385 386 387
        except:
            pass

388 389
        logging.info("Unstopped containers killed")

390
        if destroy_dirs and p.exists(self.instances_dir):
391
            logging.info("Removing instances dir %s", self.instances_dir)
392 393
            shutil.rmtree(self.instances_dir)

394 395 396
        for instance in self.instances.values():
            instance.create_dir(destroy_dir=destroy_dirs)

A
alesapin 已提交
397
        self.docker_client = docker.from_env(version=self.docker_api_version)
398

399
        common_opts = ['up', '-d', '--force-recreate']
400

401
        if self.with_zookeeper and self.base_zookeeper_cmd:
402
            subprocess_check_call(self.base_zookeeper_cmd + common_opts)
403
            for command in self.pre_zookeeper_commands:
404
                self.run_kazoo_commands_with_retries(command, repeats=5)
405
            self.wait_zookeeper_to_start(120)
406

S
sundy-li 已提交
407
        if self.with_mysql and self.base_mysql_cmd:
408
            subprocess_check_call(self.base_mysql_cmd + common_opts)
409
            self.wait_mysql_to_start(120)
S
sundy-li 已提交
410

411
        if self.with_postgres and self.base_postgres_cmd:
412
            subprocess_check_call(self.base_postgres_cmd + common_opts)
413 414
            self.wait_postgres_to_start(120)

415
        if self.with_kafka and self.base_kafka_cmd:
416
            subprocess_check_call(self.base_kafka_cmd + common_opts + ['--renew-anon-volumes'])
A
Amos Bird 已提交
417
            self.kafka_docker_id = self.get_instance_docker_id('kafka1')
418

419
        if self.with_hdfs and self.base_hdfs_cmd:
420
            subprocess_check_call(self.base_hdfs_cmd + common_opts)
421 422
            self.wait_hdfs_to_start(120)

A
alesapin 已提交
423
        if self.with_mongo and self.base_mongo_cmd:
424
            subprocess_check_call(self.base_mongo_cmd + common_opts)
A
alesapin 已提交
425 426
            self.wait_mongo_to_start(30)

A
alesapin 已提交
427 428 429 430
        if self.with_redis and self.base_redis_cmd:
            subprocess_check_call(self.base_redis_cmd + ['up', '-d', '--force-recreate'])
            time.sleep(10)

431 432 433 434 435 436
        if self.with_minio and self.base_minio_cmd:
            minio_start_cmd = self.base_minio_cmd + common_opts
            logging.info("Trying to create Minio instance by command %s", ' '.join(map(str, minio_start_cmd)))
            subprocess_check_call(minio_start_cmd)
            logging.info("Trying to connect to Minio...")
            self.wait_minio_to_start()
A
alesapin 已提交
437

438 439 440 441
        clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
        logging.info("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd)))
        subprocess_check_call(clickhouse_start_cmd)
        logging.info("ClickHouse instance created")
442

443
        start_deadline = time.time() + 20.0  # seconds
444
        for instance in self.instances.itervalues():
445
            instance.docker_client = self.docker_client
446
            instance.ip_address = self.get_instance_ip(instance.name)
447

448
            logging.info("Waiting for ClickHouse start...")
449
            instance.wait_for_start(start_deadline)
450
            logging.info("ClickHouse started")
451 452 453 454 455 456

            instance.client = Client(instance.ip_address, command=self.client_bin_path)

        self.is_up = True

    def shutdown(self, kill=True):
457 458 459 460 461 462 463 464 465
        sanitizer_assert_instance = None
        with open(self.docker_logs_path, "w+") as f:
            subprocess.check_call(self.base_cmd + ['logs'], stdout=f)
            f.seek(0)
            for line in f:
                if SANITIZER_SIGN in line:
                    sanitizer_assert_instance = line.split('|')[0].strip()
                    break

466
        if kill:
P
proller 已提交
467 468
            subprocess_check_call(self.base_cmd + ['kill'])
        subprocess_check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
469

470 471
        self.is_up = False

472 473
        self.docker_client = None

474
        for instance in self.instances.values():
475
            instance.docker_client = None
476 477 478
            instance.ip_address = None
            instance.client = None

479 480 481
        if sanitizer_assert_instance is not None:
            raise Exception("Sanitizer assert found in {} for instance {}".format(self.docker_logs_path, sanitizer_assert_instance))

482

M
Mikhail Filimonov 已提交
483 484
    def open_bash_shell(self, instance_name):
        os.system(' '.join(self.base_cmd + ['exec', instance_name, '/bin/bash']))
485

486 487 488 489 490
    def get_kazoo_client(self, zoo_instance_name):
        zk = KazooClient(hosts=self.get_instance_ip(zoo_instance_name))
        zk.start()
        return zk

491
    def run_kazoo_commands_with_retries(self, kazoo_callback, zoo_instance_name='zoo1', repeats=1, sleep_for=1):
492
        for i in range(repeats - 1):
493
            try:
494 495 496 497
                kazoo_callback(self.get_kazoo_client(zoo_instance_name))
                return
            except KazooException as e:
                print repr(e)
498 499
                time.sleep(sleep_for)

500 501
        kazoo_callback(self.get_kazoo_client(zoo_instance_name))

502
    def add_zookeeper_startup_command(self, command):
503
        self.pre_zookeeper_commands.append(command)
504 505


A
alesapin 已提交
506 507 508 509
CLICKHOUSE_START_COMMAND = "clickhouse server --config-file=/etc/clickhouse-server/config.xml --log-file=/var/log/clickhouse-server/clickhouse-server.log --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log"

CLICKHOUSE_STAY_ALIVE_COMMAND = 'bash -c "{} --daemon; tail -f /dev/null"'.format(CLICKHOUSE_START_COMMAND)

510
DOCKER_COMPOSE_TEMPLATE = '''
511
version: '2.2'
512 513
services:
    {name}:
A
alesapin 已提交
514
        image: {image}
515
        hostname: {hostname}
516 517 518 519
        volumes:
            - {configs_dir}:/etc/clickhouse-server/
            - {db_dir}:/var/lib/clickhouse/
            - {logs_dir}:/var/log/clickhouse-server/
520 521
            {binary_volume}
            {odbc_bridge_volume}
A
alesapin 已提交
522
            {odbc_ini_path}
A
alesapin 已提交
523
        entrypoint: {entrypoint_cmd}
524
        tmpfs: {tmpfs}
A
alesapin 已提交
525 526
        cap_add:
            - SYS_PTRACE
527
        depends_on: {depends_on}
A
alesapin 已提交
528
        user: '{user}'
529 530
        env_file:
            - {env_file}
A
alesapin 已提交
531 532
        security_opt:
            - label:disable
533 534 535 536
        {networks}
            {app_net}
                {ipv4_address}
                {ipv6_address}
537 538
                {net_aliases}
                    {net_alias1}
539 540
'''

A
alesapin 已提交
541

542
class ClickHouseInstance:
A
alesapin 已提交
543

544
    def __init__(
545
            self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
546 547 548 549
            with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
            base_configs_dir, server_bin_path, odbc_bridge_bin_path,
            clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={},
            image="yandex/clickhouse-integration-test",
550
            stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=[]):
551 552

        self.name = name
553 554 555
        self.base_cmd = cluster.base_cmd[:]
        self.docker_id = cluster.get_instance_docker_id(self.name)
        self.cluster = cluster
556
        self.hostname = hostname if hostname is not None else self.name
557

558
        self.tmpfs = tmpfs[:]
559 560 561
        self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
        self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
        self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
562
        self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
563
        self.macros = macros if macros is not None else {}
564
        self.with_zookeeper = with_zookeeper
565
        self.zookeeper_config_path = zookeeper_config_path
566 567 568

        self.base_configs_dir = base_configs_dir
        self.server_bin_path = server_bin_path
569
        self.odbc_bridge_bin_path = odbc_bridge_bin_path
570

S
sundy-li 已提交
571
        self.with_mysql = with_mysql
572
        self.with_kafka = with_kafka
A
alesapin 已提交
573
        self.with_mongo = with_mongo
A
alesapin 已提交
574
        self.with_redis = with_redis
575
        self.with_minio = with_minio
S
sundy-li 已提交
576

577
        self.path = p.join(self.cluster.instances_dir, name)
578
        self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
579
        self.env_variables = env_variables
A
alesapin 已提交
580 581 582 583 584
        if with_odbc_drivers:
            self.odbc_ini_path = os.path.dirname(self.docker_compose_path) + "/odbc.ini:/etc/odbc.ini"
            self.with_mysql = True
        else:
            self.odbc_ini_path = ""
585

586
        self.docker_client = None
587 588
        self.ip_address = None
        self.client = None
589
        self.default_timeout = 20.0  # 20 sec
A
alesapin 已提交
590
        self.image = image
A
alesapin 已提交
591
        self.stay_alive = stay_alive
592 593
        self.ipv4_address = ipv4_address
        self.ipv6_address = ipv6_address
594
        self.with_installed_binary = with_installed_binary
595

A
Alexey Milovidov 已提交
596
    # Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
597 598 599
    def query(self, sql, stdin=None, timeout=None, settings=None, user=None, ignore_error=False):
        return self.client.query(sql, stdin, timeout, settings, user, ignore_error)

600 601
    def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, ignore_error=False,
                         retry_count=20, sleep_time=0.5, check_callback=lambda x: True):
602 603 604 605 606 607 608 609 610 611 612 613 614 615
        result = None
        for i in range(retry_count):
            try:
                result = self.query(sql, stdin, timeout, settings, user, ignore_error)
                if check_callback(result):
                    return result
                time.sleep(sleep_time)
            except Exception as ex:
                print "Retry {} got exception {}".format(i + 1, ex)
                time.sleep(sleep_time)

        if result is not None:
            return result
        raise Exception("Can't execute query {}".format(sql))
616

617
    # As query() but doesn't wait response and returns response handler
618 619 620
    def get_query_request(self, *args, **kwargs):
        return self.client.get_query_request(*args, **kwargs)

621 622 623 624
    # Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code
    def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None):
        return self.client.query_and_get_error(sql, stdin, timeout, settings, user)

N
Nikita Mikhaylov 已提交
625 626 627 628
    # The same as query_and_get_error but ignores successful query.
    def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None):
        return self.client.query_and_get_answer_with_error(sql, stdin, timeout, settings, user)

629
    # Connects to the instance via HTTP interface, sends a query and returns the answer
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
    def http_query(self, sql, data=None, params=None, user=None):
        if params is None:
            params = {}
        else:
            params = params.copy()

        params["query"] = sql

        auth = ""
        if user:
            auth = "{}@".format(user)

        url = "http://" + auth + self.ip_address + ":8123/?" + urllib.urlencode(params)

        return urllib.urlopen(url, data).read()
645

N
Nikita Mikhaylov 已提交
646 647 648 649 650 651 652
    def kill_clickhouse(self, stop_start_wait_sec=5):
        pid = self.get_process_pid("clickhouse")
        if not pid:
            raise Exception("No clickhouse found")
        self.exec_in_container(["bash",  "-c", "kill -9 {}".format(pid)], user='root')
        time.sleep(stop_start_wait_sec)

N
Nikita Mikhaylov 已提交
653 654 655 656 657 658 659 660 661
    def restore_clickhouse(self, retries=100):
        pid = self.get_process_pid("clickhouse")
        if pid:
            raise Exception("ClickHouse has already started")
        self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
        from helpers.test_tools import assert_eq_with_retry
        # wait start
        assert_eq_with_retry(self, "select 1", "1", retry_count=retries)

662
    def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
663 664 665
        if not self.stay_alive:
            raise Exception("clickhouse can be restarted only with stay_alive=True instance")

666
        self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
667
        time.sleep(stop_start_wait_sec)
668
        self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
669

A
alesapin 已提交
670
    def exec_in_container(self, cmd, detach=False, **kwargs):
671
        container = self.get_docker_handle()
672
        exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
A
alesapin 已提交
673
        output = self.docker_client.api.exec_start(exec_id, detach=detach)
674 675 676

        output = output.decode('utf8')
        exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
677
        if exit_code:
678
            raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output))
679 680
        return output

A
alesapin 已提交
681
    def contains_in_log(self, substring):
682 683
        result = self.exec_in_container(
            ["bash", "-c", "grep '{}' /var/log/clickhouse-server/clickhouse-server.log || true".format(substring)])
A
alesapin 已提交
684 685
        return len(result) > 0

A
alesapin 已提交
686 687 688 689
    def copy_file_to_container(self, local_path, dest_path):
        with open(local_path, 'r') as fdata:
            data = fdata.read()
            encoded_data = base64.b64encode(data)
690 691
            self.exec_in_container(["bash", "-c", "echo {} | base64 --decode > {}".format(encoded_data, dest_path)],
                                   user='root')
A
alesapin 已提交
692

A
alesapin 已提交
693
    def get_process_pid(self, process_name):
694 695 696
        output = self.exec_in_container(["bash", "-c",
                                         "ps ax | grep '{}' | grep -v 'grep' | grep -v 'bash -c' | awk '{{print $1}}'".format(
                                             process_name)])
A
alesapin 已提交
697 698 699 700 701 702 703 704 705
        if output:
            try:
                pid = int(output.split('\n')[0].strip())
                return pid
            except:
                return None
        return None

    def restart_with_latest_version(self, stop_start_wait_sec=10, callback_onstop=None, signal=15):
706 707 708
        if not self.stay_alive:
            raise Exception("Cannot restart not stay alive container")
        self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(signal)], user='root')
A
alesapin 已提交
709 710 711 712 713 714 715 716 717
        retries = int(stop_start_wait_sec / 0.5)
        local_counter = 0
        # wait stop
        while local_counter < retries:
            if not self.get_process_pid("clickhouse server"):
                break
            time.sleep(0.5)
            local_counter += 1

A
alesapin 已提交
718 719 720 721
        # force kill if server hangs
        if self.get_process_pid("clickhouse server"):
            self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(9)], user='root')

722 723
        if callback_onstop:
            callback_onstop(self)
724 725 726 727 728 729
        self.exec_in_container(
            ["bash", "-c", "cp /usr/share/clickhouse_fresh /usr/bin/clickhouse && chmod 777 /usr/bin/clickhouse"],
            user='root')
        self.exec_in_container(["bash", "-c",
                                "cp /usr/share/clickhouse-odbc-bridge_fresh /usr/bin/clickhouse-odbc-bridge && chmod 777 /usr/bin/clickhouse"],
                               user='root')
730
        self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
A
alesapin 已提交
731 732 733
        from helpers.test_tools import assert_eq_with_retry
        # wait start
        assert_eq_with_retry(self, "select 1", "1", retry_count=retries)
734

735 736
    def get_docker_handle(self):
        return self.docker_client.containers.get(self.docker_id)
737

738
    def stop(self):
A
alesapin 已提交
739
        self.get_docker_handle().stop()
740 741 742 743 744 745 746 747 748

    def start(self):
        self.get_docker_handle().start()

    def wait_for_start(self, deadline=None, timeout=None):
        start_time = time.time()

        if timeout is not None:
            deadline = start_time + timeout
749 750

        while True:
751
            handle = self.get_docker_handle()
752
            status = handle.status
753
            if status == 'exited':
754 755 756
                raise Exception(
                    "Instance `{}' failed to start. Container status: {}, logs: {}".format(self.name, status,
                                                                                           handle.logs()))
757 758 759 760 761 762

            current_time = time.time()
            time_left = deadline - current_time
            if deadline is not None and current_time >= deadline:
                raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
                                "Container status: {}".format(self.name, self.ip_address, status))
763 764 765 766 767

            # Repeatedly poll the instance address until there is something that listens there.
            # Usually it means that ClickHouse is ready to accept queries.
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
768
                sock.settimeout(time_left)
769 770
                sock.connect((self.ip_address, 9000))
                return
771 772
            except socket.timeout:
                continue
773 774 775 776 777 778 779 780
            except socket.error as e:
                if e.errno == errno.ECONNREFUSED:
                    time.sleep(0.1)
                else:
                    raise
            finally:
                sock.close()

781 782 783 784 785
    @staticmethod
    def dict_to_xml(dictionary):
        xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
        return xml.dom.minidom.parseString(xml_str).toprettyxml()

A
alesapin 已提交
786 787 788 789 790 791
    @property
    def odbc_drivers(self):
        if self.odbc_ini_path:
            return {
                "SQLite3": {
                    "DSN": "sqlite3_odbc",
792
                    "Database": "/tmp/sqliteodbc",
A
alesapin 已提交
793 794 795 796 797 798 799 800 801 802 803 804 805
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
                    "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
                },
                "MySQL": {
                    "DSN": "mysql_odbc",
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
                    "Database": "clickhouse",
                    "Uid": "root",
                    "Pwd": "clickhouse",
                    "Server": "mysql1",
                },
                "PostgreSQL": {
                    "DSN": "postgresql_odbc",
806 807 808 809 810 811 812 813 814
                    "Database": "postgres",
                    "UserName": "postgres",
                    "Password": "mysecretpassword",
                    "Port": "5432",
                    "Servername": "postgres1",
                    "Protocol": "9.3",
                    "ReadOnly": "No",
                    "RowVersioning": "No",
                    "ShowSystemTables": "No",
A
alesapin 已提交
815 816
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
                    "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
817
                    "ConnSettings": "",
A
alesapin 已提交
818 819 820 821 822 823 824 825 826 827 828 829
                }
            }
        else:
            return {}

    def _create_odbc_config_file(self):
        with open(self.odbc_ini_path.split(':')[0], 'w') as f:
            for driver_setup in self.odbc_drivers.values():
                f.write("[{}]\n".format(driver_setup["DSN"]))
                for key, value in driver_setup.items():
                    if key != "DSN":
                        f.write(key + "=" + value + "\n")
830

831 832 833
    def replace_config(self, path_to_config, replacement):
        self.exec_in_container(["bash", "-c", "echo '{}' > {}".format(replacement, path_to_config)])

834 835 836 837 838 839 840 841
    def create_dir(self, destroy_dir=True):
        """Create the instance directory and all the needed files there."""

        if destroy_dir:
            self.destroy_dir()
        elif p.exists(self.path):
            return

842
        os.makedirs(self.path)
843

844
        configs_dir = p.abspath(p.join(self.path, 'configs'))
845 846 847 848 849
        os.mkdir(configs_dir)

        shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
        shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)

P
proller 已提交
850 851 852
        # used by all utils with any config
        conf_d_dir = p.abspath(p.join(configs_dir, 'conf.d'))
        # used by server with main config.xml
853 854
        config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
        users_d_dir = p.abspath(p.join(configs_dir, 'users.d'))
P
proller 已提交
855
        os.mkdir(conf_d_dir)
856
        os.mkdir(config_d_dir)
A
Alexey Zatelepin 已提交
857
        os.mkdir(users_d_dir)
858

859 860
        # The file is named with 0_ prefix to be processed before other configuration overloads.
        shutil.copy(p.join(HELPERS_DIR, '0_common_instance_config.xml'), config_d_dir)
861

862 863 864
        # Generate and write macros file
        macros = self.macros.copy()
        macros['instance'] = self.name
865
        with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
866
            macros_config.write(self.dict_to_xml({"macros": macros}))
867

868
        # Put ZooKeeper config
869
        if self.with_zookeeper:
P
proller 已提交
870
            shutil.copy(self.zookeeper_config_path, conf_d_dir)
871

872 873 874 875 876 877
        # Copy config dir
        if self.custom_config_dir:
            distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)

        # Copy config.d configs
        for path in self.custom_main_config_paths:
878 879
            shutil.copy(path, config_d_dir)

880 881 882 883
        # Copy users.d configs
        for path in self.custom_user_config_paths:
            shutil.copy(path, users_d_dir)

884
        db_dir = p.abspath(p.join(self.path, 'database'))
885
        os.mkdir(db_dir)
886 887
        if self.clickhouse_path_dir is not None:
            distutils.dir_util.copy_tree(self.clickhouse_path_dir, db_dir)
888

889
        logs_dir = p.abspath(p.join(self.path, 'logs'))
890 891
        os.mkdir(logs_dir)

S
sundy-li 已提交
892 893 894 895 896
        depends_on = []

        if self.with_mysql:
            depends_on.append("mysql1")

897 898 899
        if self.with_kafka:
            depends_on.append("kafka1")

900
        if self.with_zookeeper:
S
sundy-li 已提交
901 902 903
            depends_on.append("zoo1")
            depends_on.append("zoo2")
            depends_on.append("zoo3")
904

905 906 907 908
        if self.with_minio:
            depends_on.append("minio1")
            depends_on.append("redirect")

909 910
        env_file = _create_env_file(os.path.dirname(self.docker_compose_path), self.env_variables)

A
alesapin 已提交
911 912 913 914 915
        odbc_ini_path = ""
        if self.odbc_ini_path:
            self._create_odbc_config_file()
            odbc_ini_path = '- ' + self.odbc_ini_path

A
alesapin 已提交
916 917 918 919 920
        entrypoint_cmd = CLICKHOUSE_START_COMMAND

        if self.stay_alive:
            entrypoint_cmd = CLICKHOUSE_STAY_ALIVE_COMMAND

921 922
        networks = app_net = ipv4_address = ipv6_address = net_aliases = net_alias1 = ""
        if self.ipv4_address is not None or self.ipv6_address is not None or self.hostname != self.name:
923
            networks = "networks:"
A
alesapin 已提交
924
            app_net = "default:"
925 926 927 928
            if self.ipv4_address is not None:
                ipv4_address = "ipv4_address: " + self.ipv4_address
            if self.ipv6_address is not None:
                ipv6_address = "ipv6_address: " + self.ipv6_address
929 930 931
            if self.hostname != self.name:
                net_aliases = "aliases:"
                net_alias1 = "- " + self.hostname
932

933 934 935 936 937 938 939
        if not self.with_installed_binary:
            binary_volume = "- " + self.server_bin_path + ":/usr/bin/clickhouse"
            odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/bin/clickhouse-odbc-bridge"
        else:
            binary_volume = "- " + self.server_bin_path + ":/usr/share/clickhouse_fresh"
            odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/share/clickhouse-odbc-bridge_fresh"

940 941
        with open(self.docker_compose_path, 'w') as docker_compose:
            docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
A
alesapin 已提交
942
                image=self.image,
943
                name=self.name,
944
                hostname=self.hostname,
945 946
                binary_volume=binary_volume,
                odbc_bridge_volume=odbc_bridge_volume,
947 948
                configs_dir=configs_dir,
                config_d_dir=config_d_dir,
S
sundy-li 已提交
949
                db_dir=db_dir,
950
                tmpfs=str(self.tmpfs),
951
                logs_dir=logs_dir,
952
                depends_on=str(depends_on),
A
alesapin 已提交
953
                user=os.getuid(),
A
alesapin 已提交
954 955
                env_file=env_file,
                odbc_ini_path=odbc_ini_path,
A
alesapin 已提交
956
                entrypoint_cmd=entrypoint_cmd,
957 958 959 960
                networks=networks,
                app_net=app_net,
                ipv4_address=ipv4_address,
                ipv6_address=ipv6_address,
961 962
                net_aliases = net_aliases,
                net_alias1 = net_alias1,
A
alesapin 已提交
963
            ))
964 965 966 967

    def destroy_dir(self):
        if p.exists(self.path):
            shutil.rmtree(self.path)
N
Nikita Mikhaylov 已提交
968 969 970 971 972 973 974 975 976 977 978


class ClickHouseKiller(object):
    def __init__(self, clickhouse_node):
        self.clickhouse_node = clickhouse_node

    def __enter__(self):
        self.clickhouse_node.kill_clickhouse()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.clickhouse_node.restore_clickhouse()