cluster.py 45.3 KB
Newer Older
1 2 3
import base64
import distutils.dir_util
import errno
4 5
import os
import os.path as p
6
import pwd
7 8 9
import re
import shutil
import socket
10
import subprocess
11
import time
12
import urllib
13
import httplib
14
import requests
15
import xml.dom.minidom
16 17
import logging
import docker
18
import pprint
19
import psycopg2
A
alesapin 已提交
20
import pymongo
21 22 23 24 25
import pymysql
from dicttoxml import dicttoxml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from minio import Minio
26
from confluent.schemaregistry.client import CachedSchemaRegistryClient
27

28
from .client import Client
29
from .hdfs_api import HDFSApi
30 31

HELPERS_DIR = p.dirname(__file__)
32
DEFAULT_ENV_NAME = 'env_file'
33

34 35
SANITIZER_SIGN = "=================="

36

37 38 39 40 41 42 43
def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
    full_path = os.path.join(path, fname)
    with open(full_path, 'w') as f:
        for var, value in variables.items():
            f.write("=".join([var, value]) + "\n")
    return full_path

44

P
proller 已提交
45 46 47 48 49
def subprocess_check_call(args):
    # Uncomment for debugging
    # print('run:', ' ' . join(args))
    subprocess.check_call(args)

50

P
proller 已提交
51 52 53 54 55
def subprocess_call(args):
    # Uncomment for debugging
    # print('run:', ' ' . join(args))
    subprocess.call(args)

56

57 58 59 60 61 62 63 64 65 66 67
def get_odbc_bridge_path():
    path = os.environ.get('CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH')
    if path is None:
        server_path = os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH')
        if server_path is not None:
            return os.path.join(os.path.dirname(server_path), 'clickhouse-odbc-bridge')
        else:
            return '/usr/bin/clickhouse-odbc-bridge'
    return path


68 69 70 71 72 73 74 75 76
class ClickHouseCluster:
    """ClickHouse cluster with several instances and (possibly) ZooKeeper.

    Add instances with several calls to add_instance(), then start them with the start() call.

    Directories for instances are created in the directory of base_path. After cluster is started,
    these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
    """

77
    def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
78
                 odbc_bridge_bin_path=None, zookeeper_config_path=None, custom_dockerd_host=None):
79
        self.base_dir = p.dirname(base_path)
80
        self.name = name if name is not None else ''
81

82 83 84 85
        self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR',
                                                                   '/etc/clickhouse-server/')
        self.server_bin_path = p.realpath(
            server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse'))
86
        self.odbc_bridge_bin_path = p.realpath(odbc_bridge_bin_path or get_odbc_bridge_path())
87 88 89 90
        self.client_bin_path = p.realpath(
            client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client'))
        self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(
            HELPERS_DIR, 'zookeeper_config.xml')
91

92
        self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
93 94
        # docker-compose removes everything non-alphanumeric from project names so we do it too.
        self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
95
        self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
96
        self.docker_logs_path = p.join(self.instances_dir, 'docker.log')
97

98
        custom_dockerd_host = custom_dockerd_host or os.environ.get('CLICKHOUSE_TESTS_DOCKERD_HOST')
99
        self.docker_api_version = os.environ.get("DOCKER_API_VERSION")
100 101 102 103 104 105

        self.base_cmd = ['docker-compose']
        if custom_dockerd_host:
            self.base_cmd += ['--host', custom_dockerd_host]

        self.base_cmd += ['--project-directory', self.base_dir, '--project-name', self.project_name]
106
        self.base_zookeeper_cmd = None
S
sundy-li 已提交
107
        self.base_mysql_cmd = []
108
        self.base_kafka_cmd = []
109
        self.pre_zookeeper_commands = []
110 111
        self.instances = {}
        self.with_zookeeper = False
S
sundy-li 已提交
112
        self.with_mysql = False
113
        self.with_postgres = False
114
        self.with_kafka = False
A
alesapin 已提交
115
        self.with_odbc_drivers = False
116
        self.with_hdfs = False
A
alesapin 已提交
117
        self.with_mongo = False
118
        self.with_net_trics = False
A
alesapin 已提交
119
        self.with_redis = False
120

121 122 123 124 125 126 127 128
        self.with_minio = False
        self.minio_host = "minio1"
        self.minio_bucket = "root"
        self.minio_port = 9001
        self.minio_client = None  # type: Minio
        self.minio_redirect_host = "redirect"
        self.minio_redirect_port = 80

129 130 131 132 133
        # available when with_kafka == True
        self.schema_registry_client = None
        self.schema_registry_host = "schema-registry"
        self.schema_registry_port = 8081

134
        self.docker_client = None
135 136
        self.is_up = False

A
alesapin 已提交
137 138 139 140 141 142
    def get_client_cmd(self):
        cmd = self.client_bin_path
        if p.basename(cmd) == 'clickhouse':
            cmd += " client"
        return cmd

143
    def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None,
144 145 146
                     with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
                     with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
                     with_redis=False, with_minio=False,
147
                     hostname=None, env_variables=None, image="yandex/clickhouse-integration-test",
148 149
                     stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
                     zookeeper_docker_compose_path=None):
150 151 152
        """Add an instance to the cluster.

        name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
153 154 155
        config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
        main_configs - a list of config files that will be added to config.d/ directory
        user_configs - a list of config files that will be added to users.d/ directory
156 157 158 159
        with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
        """

        if self.is_up:
160
            raise Exception("Can\'t add instance %s: cluster is already up!" % name)
161 162

        if name in self.instances:
163
            raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
164

165
        instance = ClickHouseInstance(
166 167
            self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {},
            with_zookeeper,
168 169
            self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
            self.base_configs_dir, self.server_bin_path,
170
            self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
171
            env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
172
            ipv6_address=ipv6_address,
173
            with_installed_binary=with_installed_binary, tmpfs=tmpfs or [])
174

175
        self.instances[name] = instance
A
alesapin 已提交
176
        if ipv4_address is not None or ipv6_address is not None:
177
            self.with_net_trics = True
A
alesapin 已提交
178 179
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])

180
        self.base_cmd.extend(['--file', instance.docker_compose_path])
A
alesapin 已提交
181

182
        cmds = []
183
        if with_zookeeper and not self.with_zookeeper:
184 185 186
            if not zookeeper_docker_compose_path:
                zookeeper_docker_compose_path = p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')

187
            self.with_zookeeper = True
188
            self.base_cmd.extend(['--file', zookeeper_docker_compose_path])
189
            self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
190
                                       self.project_name, '--file', zookeeper_docker_compose_path]
191
            cmds.append(self.base_zookeeper_cmd)
192

S
sundy-li 已提交
193 194 195 196
        if with_mysql and not self.with_mysql:
            self.with_mysql = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
            self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
197
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
198

199 200
            cmds.append(self.base_mysql_cmd)

201 202 203 204
        if with_postgres and not self.with_postgres:
            self.with_postgres = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
            self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
205
                                      self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
206
            cmds.append(self.base_postgres_cmd)
207

A
alesapin 已提交
208 209 210 211 212 213 214
        if with_odbc_drivers and not self.with_odbc_drivers:
            self.with_odbc_drivers = True
            if not self.with_mysql:
                self.with_mysql = True
                self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
                self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
215 216
                cmds.append(self.base_mysql_cmd)

217 218 219 220
            if not self.with_postgres:
                self.with_postgres = True
                self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
                self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
221 222
                                          self.project_name, '--file',
                                          p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
223
                cmds.append(self.base_postgres_cmd)
224

225 226 227 228
        if with_kafka and not self.with_kafka:
            self.with_kafka = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')])
            self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
229
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]
230
            cmds.append(self.base_kafka_cmd)
231

232 233 234 235
        if with_hdfs and not self.with_hdfs:
            self.with_hdfs = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')])
            self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
236
                                  self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
237
            cmds.append(self.base_hdfs_cmd)
238

A
alesapin 已提交
239 240 241 242
        if with_mongo and not self.with_mongo:
            self.with_mongo = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
            self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
243
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
244 245 246 247 248
            cmds.append(self.base_mongo_cmd)

        if self.with_net_trics:
            for cmd in cmds:
                cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
249

A
alesapin 已提交
250 251 252 253
        if with_redis and not self.with_redis:
            self.with_redis = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_redis.yml')])
            self.base_redis_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
254
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_redis.yml')]
A
alesapin 已提交
255

256 257 258 259 260 261
        if with_minio and not self.with_minio:
            self.with_minio = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_minio.yml')])
            self.base_minio_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                   self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_minio.yml')]
            cmds.append(self.base_minio_cmd)
A
alesapin 已提交
262

263 264
        return instance

265 266 267 268
    def get_instance_docker_id(self, instance_name):
        # According to how docker-compose names containers.
        return self.project_name + '_' + instance_name + '_1'

A
alesapin 已提交
269 270 271 272 273 274 275 276 277 278
    def _replace(self, path, what, to):
        with open(path, 'r') as p:
            data = p.read()
        data = data.replace(what, to)
        with open(path, 'w') as p:
            p.write(data)

    def restart_instance_with_ip_change(self, node, new_ip):
        if '::' in new_ip:
            if node.ipv6_address is None:
V
Vladimir Chebotarev 已提交
279
                raise Exception("You should specity ipv6_address in add_node method")
A
alesapin 已提交
280 281 282 283
            self._replace(node.docker_compose_path, node.ipv6_address, new_ip)
            node.ipv6_address = new_ip
        else:
            if node.ipv4_address is None:
V
Vladimir Chebotarev 已提交
284
                raise Exception("You should specity ipv4_address in add_node method")
A
alesapin 已提交
285 286 287 288 289 290 291
            self._replace(node.docker_compose_path, node.ipv4_address, new_ip)
            node.ipv4_address = new_ip
        subprocess.check_call(self.base_cmd + ["stop", node.name])
        subprocess.check_call(self.base_cmd + ["rm", "--force", "--stop", node.name])
        subprocess.check_call(self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name])
        node.ip_address = self.get_instance_ip(node.name)
        node.client = Client(node.ip_address, command=self.client_bin_path)
292
        start_deadline = time.time() + 20.0  # seconds
A
alesapin 已提交
293 294
        node.wait_for_start(start_deadline)
        return node
295

296 297 298 299 300
    def get_instance_ip(self, instance_name):
        docker_id = self.get_instance_docker_id(instance_name)
        handle = self.docker_client.containers.get(docker_id)
        return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']

A
alesapin 已提交
301 302 303 304 305 306 307 308
    def wait_mysql_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
                conn.close()
                print "Mysql Started"
                return
A
alesapin 已提交
309
            except Exception as ex:
310
                print "Can't connect to MySQL " + str(ex)
A
alesapin 已提交
311 312
                time.sleep(0.5)

313
        subprocess_call(['docker-compose', 'ps', '--services', '--all'])
A
alesapin 已提交
314 315
        raise Exception("Cannot wait MySQL container")

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
    def wait_postgres_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
                conn = psycopg2.connect(conn_string)
                conn.close()
                print "Postgres Started"
                return
            except Exception as ex:
                print "Can't connect to Postgres " + str(ex)
                time.sleep(0.5)

        raise Exception("Cannot wait Postgres container")

331 332 333 334 335 336 337
    def wait_zookeeper_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                for instance in ['zoo1', 'zoo2', 'zoo3']:
                    conn = self.get_kazoo_client(instance)
                    conn.get_children('/')
338
                print "All instances of ZooKeeper started"
339
                return
A
alesapin 已提交
340
            except Exception as ex:
341
                print "Can't connect to ZooKeeper " + str(ex)
342 343 344
                time.sleep(0.5)

        raise Exception("Cannot wait ZooKeeper container")
345

346 347 348 349 350 351 352 353 354 355 356 357 358 359
    def wait_hdfs_to_start(self, timeout=60):
        hdfs_api = HDFSApi("root")
        start = time.time()
        while time.time() - start < timeout:
            try:
                hdfs_api.write_data("/somefilewithrandomname222", "1")
                print "Connected to HDFS and SafeMode disabled! "
                return
            except Exception as ex:
                print "Can't connect to HDFS " + str(ex)
                time.sleep(1)

        raise Exception("Can't wait HDFS to start")

A
alesapin 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373
    def wait_mongo_to_start(self, timeout=30):
        connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
            host='localhost', port='27018', user='root', password='clickhouse')
        connection = pymongo.MongoClient(connection_str)
        start = time.time()
        while time.time() - start < timeout:
            try:
                connection.database_names()
                print "Connected to Mongo dbs:", connection.database_names()
                return
            except Exception as ex:
                print "Can't connect to Mongo " + str(ex)
                time.sleep(1)

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
    def wait_minio_to_start(self, timeout=10):
        minio_client = Minio('localhost:9001',
                             access_key='minio',
                             secret_key='minio123',
                             secure=False)
        start = time.time()
        while time.time() - start < timeout:
            try:
                buckets = minio_client.list_buckets()
                self.minio_client = minio_client
                logging.info("Connected to Minio %s", buckets)
                return
            except Exception as ex:
                logging.warning("Can't connect to Minio: %s", str(ex))
                time.sleep(1)

390 391 392 393 394 395 396 397 398 399 400 401 402
    def wait_schema_registry_to_start(self, timeout=10):
        sr_client = CachedSchemaRegistryClient('http://localhost:8081')
        start = time.time()
        while time.time() - start < timeout:
            try:
                sr_client._send_request(sr_client.url)
                self.schema_registry_client = sr_client
                logging.info("Connected to SchemaRegistry")
                return
            except Exception as ex:
                logging.warning("Can't connect to SchemaRegistry: %s", str(ex))
                time.sleep(1)

403 404 405 406
    def start(self, destroy_dirs=True):
        if self.is_up:
            return

407
        # Just in case kill unstopped containers from previous launch
408
        try:
409 410
            logging.info("Trying to kill unstopped containers...")

P
proller 已提交
411 412
            if not subprocess_call(['docker-compose', 'kill']):
                subprocess_call(['docker-compose', 'down', '--volumes'])
413
            logging.info("Unstopped containers killed")
414 415 416
        except:
            pass

417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
        try:
            if destroy_dirs and p.exists(self.instances_dir):
                logging.info("Removing instances dir %s", self.instances_dir)
                shutil.rmtree(self.instances_dir)

            for instance in self.instances.values():
                instance.create_dir(destroy_dir=destroy_dirs)

            self.docker_client = docker.from_env(version=self.docker_api_version)

            common_opts = ['up', '-d', '--force-recreate']

            if self.with_zookeeper and self.base_zookeeper_cmd:
                subprocess_check_call(self.base_zookeeper_cmd + common_opts)
                for command in self.pre_zookeeper_commands:
                    self.run_kazoo_commands_with_retries(command, repeats=5)
                self.wait_zookeeper_to_start(120)

            if self.with_mysql and self.base_mysql_cmd:
                subprocess_check_call(self.base_mysql_cmd + common_opts)
                self.wait_mysql_to_start(120)

            if self.with_postgres and self.base_postgres_cmd:
                subprocess_check_call(self.base_postgres_cmd + common_opts)
                self.wait_postgres_to_start(120)

            if self.with_kafka and self.base_kafka_cmd:
                subprocess_check_call(self.base_kafka_cmd + common_opts + ['--renew-anon-volumes'])
                self.kafka_docker_id = self.get_instance_docker_id('kafka1')
                self.wait_schema_registry_to_start(120)

            if self.with_hdfs and self.base_hdfs_cmd:
                subprocess_check_call(self.base_hdfs_cmd + common_opts)
                self.wait_hdfs_to_start(120)

            if self.with_mongo and self.base_mongo_cmd:
                subprocess_check_call(self.base_mongo_cmd + common_opts)
                self.wait_mongo_to_start(30)

            if self.with_redis and self.base_redis_cmd:
                subprocess_check_call(self.base_redis_cmd + ['up', '-d', '--force-recreate'])
                time.sleep(10)

            if self.with_minio and self.base_minio_cmd:
                minio_start_cmd = self.base_minio_cmd + common_opts
                logging.info("Trying to create Minio instance by command %s", ' '.join(map(str, minio_start_cmd)))
                subprocess_check_call(minio_start_cmd)
                logging.info("Trying to connect to Minio...")
                self.wait_minio_to_start()

            clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
            logging.info("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd)))
            subprocess_check_call(clickhouse_start_cmd)
            logging.info("ClickHouse instance created")

            start_deadline = time.time() + 20.0  # seconds
            for instance in self.instances.itervalues():
                instance.docker_client = self.docker_client
                instance.ip_address = self.get_instance_ip(instance.name)

                logging.info("Waiting for ClickHouse start...")
                instance.wait_for_start(start_deadline)
                logging.info("ClickHouse started")

                instance.client = Client(instance.ip_address, command=self.client_bin_path)

            self.is_up = True
484

485 486 487 488
        except BaseException, e:
            print "Failed to start cluster: "
            print str(e)
            raise
489 490

    def shutdown(self, kill=True):
491 492 493 494 495 496 497 498 499
        sanitizer_assert_instance = None
        with open(self.docker_logs_path, "w+") as f:
            subprocess.check_call(self.base_cmd + ['logs'], stdout=f)
            f.seek(0)
            for line in f:
                if SANITIZER_SIGN in line:
                    sanitizer_assert_instance = line.split('|')[0].strip()
                    break

500
        if kill:
P
proller 已提交
501 502
            subprocess_check_call(self.base_cmd + ['kill'])
        subprocess_check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
503

504 505
        self.is_up = False

506 507
        self.docker_client = None

508
        for instance in self.instances.values():
509
            instance.docker_client = None
510 511 512
            instance.ip_address = None
            instance.client = None

513 514 515
        if sanitizer_assert_instance is not None:
            raise Exception("Sanitizer assert found in {} for instance {}".format(self.docker_logs_path, sanitizer_assert_instance))

516 517 518 519 520 521 522
    def pause_container(self, instance_name):
        subprocess_check_call(self.base_cmd + ['pause', instance_name])
    #    subprocess_check_call(self.base_cmd + ['kill', '-s SIGSTOP', instance_name])

    def unpause_container(self, instance_name):
        subprocess_check_call(self.base_cmd + ['unpause', instance_name])
    #    subprocess_check_call(self.base_cmd + ['kill', '-s SIGCONT', instance_name])
523

M
Mikhail Filimonov 已提交
524 525
    def open_bash_shell(self, instance_name):
        os.system(' '.join(self.base_cmd + ['exec', instance_name, '/bin/bash']))
526

527 528 529 530 531
    def get_kazoo_client(self, zoo_instance_name):
        zk = KazooClient(hosts=self.get_instance_ip(zoo_instance_name))
        zk.start()
        return zk

532
    def run_kazoo_commands_with_retries(self, kazoo_callback, zoo_instance_name='zoo1', repeats=1, sleep_for=1):
533
        for i in range(repeats - 1):
534
            try:
535 536 537 538
                kazoo_callback(self.get_kazoo_client(zoo_instance_name))
                return
            except KazooException as e:
                print repr(e)
539 540
                time.sleep(sleep_for)

541 542
        kazoo_callback(self.get_kazoo_client(zoo_instance_name))

543
    def add_zookeeper_startup_command(self, command):
544
        self.pre_zookeeper_commands.append(command)
545 546


A
alesapin 已提交
547 548 549 550
CLICKHOUSE_START_COMMAND = "clickhouse server --config-file=/etc/clickhouse-server/config.xml --log-file=/var/log/clickhouse-server/clickhouse-server.log --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log"

CLICKHOUSE_STAY_ALIVE_COMMAND = 'bash -c "{} --daemon; tail -f /dev/null"'.format(CLICKHOUSE_START_COMMAND)

551
DOCKER_COMPOSE_TEMPLATE = '''
552
version: '2.2'
553 554
services:
    {name}:
A
alesapin 已提交
555
        image: {image}
556
        hostname: {hostname}
557 558 559 560
        volumes:
            - {configs_dir}:/etc/clickhouse-server/
            - {db_dir}:/var/lib/clickhouse/
            - {logs_dir}:/var/log/clickhouse-server/
561 562
            {binary_volume}
            {odbc_bridge_volume}
A
alesapin 已提交
563
            {odbc_ini_path}
A
alesapin 已提交
564
        entrypoint: {entrypoint_cmd}
565
        tmpfs: {tmpfs}
A
alesapin 已提交
566 567
        cap_add:
            - SYS_PTRACE
568
        depends_on: {depends_on}
A
alesapin 已提交
569
        user: '{user}'
570 571
        env_file:
            - {env_file}
A
alesapin 已提交
572 573
        security_opt:
            - label:disable
574 575 576 577
        {networks}
            {app_net}
                {ipv4_address}
                {ipv6_address}
578 579
                {net_aliases}
                    {net_alias1}
580 581
'''

A
alesapin 已提交
582

583
class ClickHouseInstance:
A
alesapin 已提交
584

585
    def __init__(
586
            self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
587 588
            with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
            base_configs_dir, server_bin_path, odbc_bridge_bin_path,
589
            clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None,
590
            image="yandex/clickhouse-integration-test",
591
            stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None):
592 593

        self.name = name
594
        self.base_cmd = cluster.base_cmd
595 596
        self.docker_id = cluster.get_instance_docker_id(self.name)
        self.cluster = cluster
597
        self.hostname = hostname if hostname is not None else self.name
598

599
        self.tmpfs = tmpfs or []
600 601 602
        self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
        self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
        self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
603
        self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
604
        self.macros = macros if macros is not None else {}
605
        self.with_zookeeper = with_zookeeper
606
        self.zookeeper_config_path = zookeeper_config_path
607 608 609

        self.base_configs_dir = base_configs_dir
        self.server_bin_path = server_bin_path
610
        self.odbc_bridge_bin_path = odbc_bridge_bin_path
611

S
sundy-li 已提交
612
        self.with_mysql = with_mysql
613
        self.with_kafka = with_kafka
A
alesapin 已提交
614
        self.with_mongo = with_mongo
A
alesapin 已提交
615
        self.with_redis = with_redis
616
        self.with_minio = with_minio
S
sundy-li 已提交
617

618
        self.path = p.join(self.cluster.instances_dir, name)
619
        self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
620
        self.env_variables = env_variables or {}
A
alesapin 已提交
621 622 623 624 625
        if with_odbc_drivers:
            self.odbc_ini_path = os.path.dirname(self.docker_compose_path) + "/odbc.ini:/etc/odbc.ini"
            self.with_mysql = True
        else:
            self.odbc_ini_path = ""
626

627
        self.docker_client = None
628 629
        self.ip_address = None
        self.client = None
630
        self.default_timeout = 20.0  # 20 sec
A
alesapin 已提交
631
        self.image = image
A
alesapin 已提交
632
        self.stay_alive = stay_alive
633 634
        self.ipv4_address = ipv4_address
        self.ipv6_address = ipv6_address
635
        self.with_installed_binary = with_installed_binary
636

A
Alexey Milovidov 已提交
637
    # Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
V
Vitaly Baranov 已提交
638 639
    def query(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, ignore_error=False):
        return self.client.query(sql, stdin, timeout, settings, user, password, ignore_error)
640

V
Vitaly Baranov 已提交
641
    def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, ignore_error=False,
642
                         retry_count=20, sleep_time=0.5, check_callback=lambda x: True):
643 644 645
        result = None
        for i in range(retry_count):
            try:
V
Vitaly Baranov 已提交
646
                result = self.query(sql, stdin, timeout, settings, user, password, ignore_error)
647 648 649 650 651 652 653 654 655 656
                if check_callback(result):
                    return result
                time.sleep(sleep_time)
            except Exception as ex:
                print "Retry {} got exception {}".format(i + 1, ex)
                time.sleep(sleep_time)

        if result is not None:
            return result
        raise Exception("Can't execute query {}".format(sql))
657

658
    # As query() but doesn't wait response and returns response handler
659 660 661
    def get_query_request(self, *args, **kwargs):
        return self.client.get_query_request(*args, **kwargs)

662
    # Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code
V
Vitaly Baranov 已提交
663 664
    def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None):
        return self.client.query_and_get_error(sql, stdin, timeout, settings, user, password)
665

N
Nikita Mikhaylov 已提交
666
    # The same as query_and_get_error but ignores successful query.
V
Vitaly Baranov 已提交
667 668
    def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None):
        return self.client.query_and_get_answer_with_error(sql, stdin, timeout, settings, user, password)
N
Nikita Mikhaylov 已提交
669

670
    # Connects to the instance via HTTP interface, sends a query and returns the answer
671
    def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False):
672 673 674 675 676 677 678 679
        if params is None:
            params = {}
        else:
            params = params.copy()

        params["query"] = sql

        auth = ""
V
Vitaly Baranov 已提交
680 681 682
        if user and password:
            auth = "{}:{}@".format(user, password)
        elif user:
683 684 685 686
            auth = "{}@".format(user)

        url = "http://" + auth + self.ip_address + ":8123/?" + urllib.urlencode(params)

687 688 689 690
        open_result = urllib.urlopen(url, data)

        def http_code_and_message():
            return str(open_result.getcode()) + " " + httplib.responses[open_result.getcode()] + ": " + open_result.read()
691

692 693 694 695 696 697 698 699 700
        if expect_fail_and_get_error:
            if open_result.getcode() == 200:
                raise Exception("ClickHouse HTTP server is expected to fail, but succeeded: " + open_result.read())
            return http_code_and_message()
        else:
            if open_result.getcode() != 200:
                raise Exception("ClickHouse HTTP server returned " + http_code_and_message())
            return open_result.read()

701 702 703 704 705
            # Connects to the instance via HTTP interface, sends a query and returns the answer
    def http_request(self, url, method='GET', params=None, data=None):
        url = "http://" + self.ip_address + ":8123/"+url
        return requests.request(method=method, url=url, params=params, data=data).content

706 707 708
    # Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
    def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None):
        return self.http_query(sql=sql, data=data, params=params, user=user, password=password, expect_fail_and_get_error=True)
709

710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
    def kill_clickhouse(self, stop_start_wait_sec=5):
        pid = self.get_process_pid("clickhouse")
        if not pid:
            raise Exception("No clickhouse found")
        self.exec_in_container(["bash",  "-c", "kill -9 {}".format(pid)], user='root')
        time.sleep(stop_start_wait_sec)

    def restore_clickhouse(self, retries=100):
        pid = self.get_process_pid("clickhouse")
        if pid:
            raise Exception("ClickHouse has already started")
        self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
        from helpers.test_tools import assert_eq_with_retry
        # wait start
        assert_eq_with_retry(self, "select 1", "1", retry_count=retries)

726
    def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
727 728 729
        if not self.stay_alive:
            raise Exception("clickhouse can be restarted only with stay_alive=True instance")

730
        self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
731
        time.sleep(stop_start_wait_sec)
732
        self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
733 734 735
        # wait start
        from helpers.test_tools import assert_eq_with_retry
        assert_eq_with_retry(self, "select 1", "1", retry_count=int(stop_start_wait_sec / 0.5), sleep_time=0.5)
736

A
alesapin 已提交
737
    def exec_in_container(self, cmd, detach=False, **kwargs):
738
        container = self.get_docker_handle()
739
        exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
A
alesapin 已提交
740
        output = self.docker_client.api.exec_start(exec_id, detach=detach)
741 742 743

        output = output.decode('utf8')
        exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
744
        if exit_code:
745 746 747 748 749 750 751 752 753 754
            container_info = self.docker_client.api.inspect_container(container.id)
            image_id = container_info.get('Image')
            image_info = self.docker_client.api.inspect_image(image_id)
            print("Command failed in container {}: ".format(container.id))
            pprint.pprint(container_info)
            print("")
            print("Container {} uses image {}: ".format(container.id, image_id))
            pprint.pprint(image_info)
            print("")
            raise Exception('Cmd "{}" failed in container {}. Return code {}. Output: {}'.format(' '.join(cmd), container.id, exit_code, output))
755 756
        return output

A
alesapin 已提交
757
    def contains_in_log(self, substring):
758
        result = self.exec_in_container(
759
            ["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)])
A
alesapin 已提交
760 761
        return len(result) > 0

A
alesapin 已提交
762 763 764 765
    def copy_file_to_container(self, local_path, dest_path):
        with open(local_path, 'r') as fdata:
            data = fdata.read()
            encoded_data = base64.b64encode(data)
766 767
            self.exec_in_container(["bash", "-c", "echo {} | base64 --decode > {}".format(encoded_data, dest_path)],
                                   user='root')
A
alesapin 已提交
768

A
alesapin 已提交
769
    def get_process_pid(self, process_name):
770 771 772
        output = self.exec_in_container(["bash", "-c",
                                         "ps ax | grep '{}' | grep -v 'grep' | grep -v 'bash -c' | awk '{{print $1}}'".format(
                                             process_name)])
A
alesapin 已提交
773 774 775 776 777 778 779 780 781
        if output:
            try:
                pid = int(output.split('\n')[0].strip())
                return pid
            except:
                return None
        return None

    def restart_with_latest_version(self, stop_start_wait_sec=10, callback_onstop=None, signal=15):
782 783 784
        if not self.stay_alive:
            raise Exception("Cannot restart not stay alive container")
        self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(signal)], user='root')
A
alesapin 已提交
785 786 787 788 789 790 791 792 793
        retries = int(stop_start_wait_sec / 0.5)
        local_counter = 0
        # wait stop
        while local_counter < retries:
            if not self.get_process_pid("clickhouse server"):
                break
            time.sleep(0.5)
            local_counter += 1

A
alesapin 已提交
794 795 796 797
        # force kill if server hangs
        if self.get_process_pid("clickhouse server"):
            self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(9)], user='root')

798 799
        if callback_onstop:
            callback_onstop(self)
800 801 802 803 804 805
        self.exec_in_container(
            ["bash", "-c", "cp /usr/share/clickhouse_fresh /usr/bin/clickhouse && chmod 777 /usr/bin/clickhouse"],
            user='root')
        self.exec_in_container(["bash", "-c",
                                "cp /usr/share/clickhouse-odbc-bridge_fresh /usr/bin/clickhouse-odbc-bridge && chmod 777 /usr/bin/clickhouse"],
                               user='root')
806
        self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
A
alesapin 已提交
807 808 809
        from helpers.test_tools import assert_eq_with_retry
        # wait start
        assert_eq_with_retry(self, "select 1", "1", retry_count=retries)
810

811 812
    def get_docker_handle(self):
        return self.docker_client.containers.get(self.docker_id)
813

814
    def stop(self):
A
alesapin 已提交
815
        self.get_docker_handle().stop()
816 817 818 819 820 821 822 823 824

    def start(self):
        self.get_docker_handle().start()

    def wait_for_start(self, deadline=None, timeout=None):
        start_time = time.time()

        if timeout is not None:
            deadline = start_time + timeout
825 826

        while True:
827
            handle = self.get_docker_handle()
828
            status = handle.status
829
            if status == 'exited':
830 831 832
                raise Exception(
                    "Instance `{}' failed to start. Container status: {}, logs: {}".format(self.name, status,
                                                                                           handle.logs()))
833 834 835 836 837 838

            current_time = time.time()
            time_left = deadline - current_time
            if deadline is not None and current_time >= deadline:
                raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
                                "Container status: {}".format(self.name, self.ip_address, status))
839 840 841 842 843

            # Repeatedly poll the instance address until there is something that listens there.
            # Usually it means that ClickHouse is ready to accept queries.
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
844
                sock.settimeout(time_left)
845 846
                sock.connect((self.ip_address, 9000))
                return
847 848
            except socket.timeout:
                continue
849 850 851 852 853 854 855 856
            except socket.error as e:
                if e.errno == errno.ECONNREFUSED:
                    time.sleep(0.1)
                else:
                    raise
            finally:
                sock.close()

857 858 859 860 861
    @staticmethod
    def dict_to_xml(dictionary):
        xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
        return xml.dom.minidom.parseString(xml_str).toprettyxml()

A
alesapin 已提交
862 863 864 865 866 867
    @property
    def odbc_drivers(self):
        if self.odbc_ini_path:
            return {
                "SQLite3": {
                    "DSN": "sqlite3_odbc",
868
                    "Database": "/tmp/sqliteodbc",
A
alesapin 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
                    "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
                },
                "MySQL": {
                    "DSN": "mysql_odbc",
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
                    "Database": "clickhouse",
                    "Uid": "root",
                    "Pwd": "clickhouse",
                    "Server": "mysql1",
                },
                "PostgreSQL": {
                    "DSN": "postgresql_odbc",
882 883 884 885 886 887 888 889 890
                    "Database": "postgres",
                    "UserName": "postgres",
                    "Password": "mysecretpassword",
                    "Port": "5432",
                    "Servername": "postgres1",
                    "Protocol": "9.3",
                    "ReadOnly": "No",
                    "RowVersioning": "No",
                    "ShowSystemTables": "No",
A
alesapin 已提交
891 892
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
                    "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
893
                    "ConnSettings": "",
A
alesapin 已提交
894 895 896 897 898 899 900 901 902 903 904 905
                }
            }
        else:
            return {}

    def _create_odbc_config_file(self):
        with open(self.odbc_ini_path.split(':')[0], 'w') as f:
            for driver_setup in self.odbc_drivers.values():
                f.write("[{}]\n".format(driver_setup["DSN"]))
                for key, value in driver_setup.items():
                    if key != "DSN":
                        f.write(key + "=" + value + "\n")
906

907 908 909
    def replace_config(self, path_to_config, replacement):
        self.exec_in_container(["bash", "-c", "echo '{}' > {}".format(replacement, path_to_config)])

910 911 912 913 914 915 916 917
    def create_dir(self, destroy_dir=True):
        """Create the instance directory and all the needed files there."""

        if destroy_dir:
            self.destroy_dir()
        elif p.exists(self.path):
            return

918
        os.makedirs(self.path)
919

920
        configs_dir = p.abspath(p.join(self.path, 'configs'))
921 922 923 924 925
        os.mkdir(configs_dir)

        shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
        shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)

P
proller 已提交
926 927 928
        # used by all utils with any config
        conf_d_dir = p.abspath(p.join(configs_dir, 'conf.d'))
        # used by server with main config.xml
929
        self.config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
930
        users_d_dir = p.abspath(p.join(configs_dir, 'users.d'))
P
proller 已提交
931
        os.mkdir(conf_d_dir)
932
        os.mkdir(self.config_d_dir)
A
Alexey Zatelepin 已提交
933
        os.mkdir(users_d_dir)
934

935
        # The file is named with 0_ prefix to be processed before other configuration overloads.
936
        shutil.copy(p.join(HELPERS_DIR, '0_common_instance_config.xml'), self.config_d_dir)
937
        shutil.copy(p.join(HELPERS_DIR, '0_common_instance_users.xml'), users_d_dir)
938

939 940 941
        # Generate and write macros file
        macros = self.macros.copy()
        macros['instance'] = self.name
942
        with open(p.join(self.config_d_dir, 'macros.xml'), 'w') as macros_config:
943
            macros_config.write(self.dict_to_xml({"macros": macros}))
944

945
        # Put ZooKeeper config
946
        if self.with_zookeeper:
P
proller 已提交
947
            shutil.copy(self.zookeeper_config_path, conf_d_dir)
948

949 950 951 952 953 954
        # Copy config dir
        if self.custom_config_dir:
            distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)

        # Copy config.d configs
        for path in self.custom_main_config_paths:
955
            shutil.copy(path, self.config_d_dir)
956

957 958 959 960
        # Copy users.d configs
        for path in self.custom_user_config_paths:
            shutil.copy(path, users_d_dir)

961
        db_dir = p.abspath(p.join(self.path, 'database'))
962
        os.mkdir(db_dir)
963 964
        if self.clickhouse_path_dir is not None:
            distutils.dir_util.copy_tree(self.clickhouse_path_dir, db_dir)
965

966
        logs_dir = p.abspath(p.join(self.path, 'logs'))
967 968
        os.mkdir(logs_dir)

S
sundy-li 已提交
969 970 971 972 973
        depends_on = []

        if self.with_mysql:
            depends_on.append("mysql1")

974 975
        if self.with_kafka:
            depends_on.append("kafka1")
976
            depends_on.append("schema-registry")
977

978
        if self.with_zookeeper:
S
sundy-li 已提交
979 980 981
            depends_on.append("zoo1")
            depends_on.append("zoo2")
            depends_on.append("zoo3")
982

983 984 985 986
        if self.with_minio:
            depends_on.append("minio1")
            depends_on.append("redirect")

987 988
        env_file = _create_env_file(os.path.dirname(self.docker_compose_path), self.env_variables)

A
alesapin 已提交
989 990 991 992 993
        odbc_ini_path = ""
        if self.odbc_ini_path:
            self._create_odbc_config_file()
            odbc_ini_path = '- ' + self.odbc_ini_path

A
alesapin 已提交
994 995 996 997 998
        entrypoint_cmd = CLICKHOUSE_START_COMMAND

        if self.stay_alive:
            entrypoint_cmd = CLICKHOUSE_STAY_ALIVE_COMMAND

999 1000
        networks = app_net = ipv4_address = ipv6_address = net_aliases = net_alias1 = ""
        if self.ipv4_address is not None or self.ipv6_address is not None or self.hostname != self.name:
1001
            networks = "networks:"
A
alesapin 已提交
1002
            app_net = "default:"
1003 1004 1005 1006
            if self.ipv4_address is not None:
                ipv4_address = "ipv4_address: " + self.ipv4_address
            if self.ipv6_address is not None:
                ipv6_address = "ipv6_address: " + self.ipv6_address
1007 1008 1009
            if self.hostname != self.name:
                net_aliases = "aliases:"
                net_alias1 = "- " + self.hostname
1010

1011 1012 1013 1014 1015 1016 1017
        if not self.with_installed_binary:
            binary_volume = "- " + self.server_bin_path + ":/usr/bin/clickhouse"
            odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/bin/clickhouse-odbc-bridge"
        else:
            binary_volume = "- " + self.server_bin_path + ":/usr/share/clickhouse_fresh"
            odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/share/clickhouse-odbc-bridge_fresh"

1018 1019
        with open(self.docker_compose_path, 'w') as docker_compose:
            docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
A
alesapin 已提交
1020
                image=self.image,
1021
                name=self.name,
1022
                hostname=self.hostname,
1023 1024
                binary_volume=binary_volume,
                odbc_bridge_volume=odbc_bridge_volume,
1025
                configs_dir=configs_dir,
1026
                config_d_dir=self.config_d_dir,
S
sundy-li 已提交
1027
                db_dir=db_dir,
1028
                tmpfs=str(self.tmpfs),
1029
                logs_dir=logs_dir,
1030
                depends_on=str(depends_on),
A
alesapin 已提交
1031
                user=os.getuid(),
A
alesapin 已提交
1032 1033
                env_file=env_file,
                odbc_ini_path=odbc_ini_path,
A
alesapin 已提交
1034
                entrypoint_cmd=entrypoint_cmd,
1035 1036 1037 1038
                networks=networks,
                app_net=app_net,
                ipv4_address=ipv4_address,
                ipv6_address=ipv6_address,
1039 1040
                net_aliases = net_aliases,
                net_alias1 = net_alias1,
A
alesapin 已提交
1041
            ))
1042 1043 1044 1045

    def destroy_dir(self):
        if p.exists(self.path):
            shutil.rmtree(self.path)
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055


class ClickHouseKiller(object):
    def __init__(self, clickhouse_node):
        self.clickhouse_node = clickhouse_node

    def __enter__(self):
        self.clickhouse_node.kill_clickhouse()

    def __exit__(self, exc_type, exc_val, exc_tb):
1056
        self.clickhouse_node.restore_clickhouse()