cluster.py 19.9 KB
Newer Older
1 2
import os
import os.path as p
3
import pwd
4 5 6
import re
import subprocess
import shutil
7
import distutils.dir_util
8 9 10
import socket
import time
import errno
11
from dicttoxml import dicttoxml
A
alesapin 已提交
12
import pymysql
13
import xml.dom.minidom
14 15
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
16 17

import docker
18
from docker.errors import ContainerError
19

20
from .client import Client, CommandRequest
21 22 23


HELPERS_DIR = p.dirname(__file__)
24
DEFAULT_ENV_NAME = 'env_file'
25

26 27 28 29 30 31 32
def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
    full_path = os.path.join(path, fname)
    with open(full_path, 'w') as f:
        for var, value in variables.items():
            f.write("=".join([var, value]) + "\n")
    return full_path

33 34 35 36 37 38 39 40 41
class ClickHouseCluster:
    """ClickHouse cluster with several instances and (possibly) ZooKeeper.

    Add instances with several calls to add_instance(), then start them with the start() call.

    Directories for instances are created in the directory of base_path. After cluster is started,
    these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
    """

42
    def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
43
                 zookeeper_config_path=None, custom_dockerd_host=None):
44
        self.base_dir = p.dirname(base_path)
45
        self.name = name if name is not None else ''
46 47 48 49

        self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
        self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
        self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
50
        self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(HELPERS_DIR, 'zookeeper_config.xml')
51

52
        self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
53 54
        # docker-compose removes everything non-alphanumeric from project names so we do it too.
        self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
55
        self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
56

57
        custom_dockerd_host = custom_dockerd_host or os.environ.get('CLICKHOUSE_TESTS_DOCKERD_HOST')
58 59 60 61 62 63

        self.base_cmd = ['docker-compose']
        if custom_dockerd_host:
            self.base_cmd += ['--host', custom_dockerd_host]

        self.base_cmd += ['--project-directory', self.base_dir, '--project-name', self.project_name]
64
        self.base_zookeeper_cmd = None
S
sundy-li 已提交
65
        self.base_mysql_cmd = []
66
        self.base_kafka_cmd = []
67
        self.pre_zookeeper_commands = []
68 69
        self.instances = {}
        self.with_zookeeper = False
S
sundy-li 已提交
70
        self.with_mysql = False
71
        self.with_kafka = False
A
alesapin 已提交
72
        self.with_odbc_drivers = False
73

74
        self.docker_client = None
75 76 77
        self.is_up = False


A
alesapin 已提交
78
    def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
79 80 81
        """Add an instance to the cluster.

        name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
82 83 84
        config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
        main_configs - a list of config files that will be added to config.d/ directory
        user_configs - a list of config files that will be added to users.d/ directory
85 86 87 88
        with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
        """

        if self.is_up:
89
            raise Exception("Can\'t add instance %s: cluster is already up!" % name)
90 91

        if name in self.instances:
92
            raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
93

94
        instance = ClickHouseInstance(
95
            self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
A
alesapin 已提交
96 97
            self.zookeeper_config_path, with_mysql, with_kafka, self.base_configs_dir, self.server_bin_path,
            clickhouse_path_dir, with_odbc_drivers, hostname=hostname, env_variables=env_variables, image=image)
98

99 100 101 102 103
        self.instances[name] = instance
        self.base_cmd.extend(['--file', instance.docker_compose_path])
        if with_zookeeper and not self.with_zookeeper:
            self.with_zookeeper = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
104 105
            self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
106

S
sundy-li 已提交
107 108 109 110 111
        if with_mysql and not self.with_mysql:
            self.with_mysql = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
            self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
112

A
alesapin 已提交
113 114 115 116 117 118 119 120
        if with_odbc_drivers and not self.with_odbc_drivers:
            self.with_odbc_drivers = True
            if not self.with_mysql:
                self.with_mysql = True
                self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
                self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]

121 122 123 124 125 126
        if with_kafka and not self.with_kafka:
            self.with_kafka = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')])
            self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]

127 128 129
        return instance


130 131 132 133 134
    def get_instance_docker_id(self, instance_name):
        # According to how docker-compose names containers.
        return self.project_name + '_' + instance_name + '_1'


135 136 137 138 139
    def get_instance_ip(self, instance_name):
        docker_id = self.get_instance_docker_id(instance_name)
        handle = self.docker_client.containers.get(docker_id)
        return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']

A
alesapin 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152
    def wait_mysql_to_start(self, timeout=60):
        start = time.time()
        while time.time() - start < timeout:
            try:
                conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
                conn.close()
                print "Mysql Started"
                return
            except Exception:
                time.sleep(0.5)

        raise Exception("Cannot wait MySQL container")

153

154 155 156 157
    def start(self, destroy_dirs=True):
        if self.is_up:
            return

158
        # Just in case kill unstopped containers from previous launch
159
        try:
160 161
            if not subprocess.call(['docker-compose', 'kill']):
                subprocess.call(['docker-compose', 'down', '--volumes'])
162 163 164
        except:
            pass

165 166
        if destroy_dirs and p.exists(self.instances_dir):
            print "Removing instances dir", self.instances_dir
167 168
            shutil.rmtree(self.instances_dir)

169 170 171
        for instance in self.instances.values():
            instance.create_dir(destroy_dir=destroy_dirs)

172 173
        self.docker_client = docker.from_env()

174 175
        if self.with_zookeeper and self.base_zookeeper_cmd:
            subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
176
            for command in self.pre_zookeeper_commands:
177
                self.run_kazoo_commands_with_retries(command, repeats=5)
178

S
sundy-li 已提交
179 180
        if self.with_mysql and self.base_mysql_cmd:
            subprocess.check_call(self.base_mysql_cmd + ['up', '-d', '--no-recreate'])
A
alesapin 已提交
181
            self.wait_mysql_to_start()
S
sundy-li 已提交
182

183 184
        if self.with_kafka and self.base_kafka_cmd:
            subprocess.check_call(self.base_kafka_cmd + ['up', '-d', '--no-recreate'])
A
Amos Bird 已提交
185
            self.kafka_docker_id = self.get_instance_docker_id('kafka1')
186

187
        # Uncomment for debugging
188
        #print ' '.join(self.base_cmd + ['up', '--no-recreate'])
189

190 191
        subprocess.check_call(self.base_cmd + ['up', '-d', '--no-recreate'])

192 193
        start_deadline = time.time() + 20.0 # seconds
        for instance in self.instances.itervalues():
194
            instance.docker_client = self.docker_client
195
            instance.ip_address = self.get_instance_ip(instance.name)
196

197
            instance.wait_for_start(start_deadline)
198 199 200 201 202 203 204 205 206

            instance.client = Client(instance.ip_address, command=self.client_bin_path)

        self.is_up = True


    def shutdown(self, kill=True):
        if kill:
            subprocess.check_call(self.base_cmd + ['kill'])
207
        subprocess.check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
208 209
        self.is_up = False

210 211
        self.docker_client = None

212
        for instance in self.instances.values():
213
            instance.docker_client = None
214 215 216 217
            instance.ip_address = None
            instance.client = None


218 219 220 221 222 223 224 225
    def get_kazoo_client(self, zoo_instance_name):
        zk = KazooClient(hosts=self.get_instance_ip(zoo_instance_name))
        zk.start()
        return zk


    def run_kazoo_commands_with_retries(self, kazoo_callback, zoo_instance_name = 'zoo1', repeats=1, sleep_for=1):
        for i in range(repeats - 1):
226
            try:
227 228 229 230
                kazoo_callback(self.get_kazoo_client(zoo_instance_name))
                return
            except KazooException as e:
                print repr(e)
231 232
                time.sleep(sleep_for)

233 234
        kazoo_callback(self.get_kazoo_client(zoo_instance_name))

235 236

    def add_zookeeper_startup_command(self, command):
237
        self.pre_zookeeper_commands.append(command)
238 239


240 241 242 243
DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
    {name}:
A
alesapin 已提交
244
        image: {image}
245
        hostname: {hostname}
246 247 248 249 250 251
        user: '{uid}'
        volumes:
            - {binary_path}:/usr/bin/clickhouse:ro
            - {configs_dir}:/etc/clickhouse-server/
            - {db_dir}:/var/lib/clickhouse/
            - {logs_dir}:/var/log/clickhouse-server/
A
alesapin 已提交
252
            {odbc_ini_path}
253 254
        entrypoint:
            -  /usr/bin/clickhouse
255
            -  server
256 257
            -  --config-file=/etc/clickhouse-server/config.xml
            -  --log-file=/var/log/clickhouse-server/clickhouse-server.log
A
Amos Bird 已提交
258
            -  --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log
259
        depends_on: {depends_on}
260 261
        env_file:
            - {env_file}
262 263
'''

264

265
class ClickHouseInstance:
A
alesapin 已提交
266

267
    def __init__(
268
            self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
A
alesapin 已提交
269 270
            with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, base_configs_dir, server_bin_path,
            clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="ubuntu:14.04"):
271 272

        self.name = name
273 274 275
        self.base_cmd = cluster.base_cmd[:]
        self.docker_id = cluster.get_instance_docker_id(self.name)
        self.cluster = cluster
276
        self.hostname = hostname if hostname is not None else self.name
277 278 279 280

        self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
        self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
        self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
281
        self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
282
        self.macros = macros if macros is not None else {}
283
        self.with_zookeeper = with_zookeeper
284
        self.zookeeper_config_path = zookeeper_config_path
285 286 287 288

        self.base_configs_dir = base_configs_dir
        self.server_bin_path = server_bin_path

S
sundy-li 已提交
289
        self.with_mysql = with_mysql
290
        self.with_kafka = with_kafka
S
sundy-li 已提交
291

292
        self.path = p.join(self.cluster.instances_dir, name)
293
        self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
294
        self.env_variables = env_variables
A
alesapin 已提交
295 296 297 298 299
        if with_odbc_drivers:
            self.odbc_ini_path = os.path.dirname(self.docker_compose_path) + "/odbc.ini:/etc/odbc.ini"
            self.with_mysql = True
        else:
            self.odbc_ini_path = ""
300

301
        self.docker_client = None
302 303
        self.ip_address = None
        self.client = None
304
        self.default_timeout = 20.0 # 20 sec
A
alesapin 已提交
305
        self.image = image
306

A
Alexey Milovidov 已提交
307
    # Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
308 309 310
    def query(self, *args, **kwargs):
        return self.client.query(*args, **kwargs)

311
    # As query() but doesn't wait response and returns response handler
312 313 314
    def get_query_request(self, *args, **kwargs):
        return self.client.get_query_request(*args, **kwargs)

315

316 317
    def exec_in_container(self, cmd, **kwargs):
        container = self.get_docker_handle()
318 319 320 321 322
        exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
        output = self.docker_client.api.exec_start(exec_id, detach=False)

        output = output.decode('utf8')
        exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
323
        if exit_code:
324
            raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output))
325 326 327
        return output


328 329
    def get_docker_handle(self):
        return self.docker_client.containers.get(self.docker_id)
330 331


332 333 334 335 336 337 338 339 340 341 342 343 344
    def stop(self):
        self.get_docker_handle().stop(self.default_timeout)


    def start(self):
        self.get_docker_handle().start()


    def wait_for_start(self, deadline=None, timeout=None):
        start_time = time.time()

        if timeout is not None:
            deadline = start_time + timeout
345 346

        while True:
347 348
            handle = self.get_docker_handle()
            status = handle.status;
349
            if status == 'exited':
350
                raise Exception("Instance `{}' failed to start. Container status: {}, logs: {}".format(self.name, status, handle.logs()))
351 352 353 354 355 356

            current_time = time.time()
            time_left = deadline - current_time
            if deadline is not None and current_time >= deadline:
                raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
                                "Container status: {}".format(self.name, self.ip_address, status))
357 358 359 360 361

            # Repeatedly poll the instance address until there is something that listens there.
            # Usually it means that ClickHouse is ready to accept queries.
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
362
                sock.settimeout(time_left)
363 364
                sock.connect((self.ip_address, 9000))
                return
365 366
            except socket.timeout:
                continue
367 368 369 370 371 372 373 374 375
            except socket.error as e:
                if e.errno == errno.ECONNREFUSED:
                    time.sleep(0.1)
                else:
                    raise
            finally:
                sock.close()


376 377 378 379 380
    @staticmethod
    def dict_to_xml(dictionary):
        xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
        return xml.dom.minidom.parseString(xml_str).toprettyxml()

A
alesapin 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
    @property
    def odbc_drivers(self):
        if self.odbc_ini_path:
            return {
                "SQLite3": {
                    "DSN": "sqlite3_odbc",
                    "Database" : "/tmp/sqliteodbc",
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
                    "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
                },
                "MySQL": {
                    "DSN": "mysql_odbc",
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
                    "Database": "clickhouse",
                    "Uid": "root",
                    "Pwd": "clickhouse",
                    "Server": "mysql1",
                },
                "PostgreSQL": {
                    "DSN": "postgresql_odbc",
                    "Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
                    "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
                }
            }
        else:
            return {}

    def _create_odbc_config_file(self):
        with open(self.odbc_ini_path.split(':')[0], 'w') as f:
            for driver_setup in self.odbc_drivers.values():
                f.write("[{}]\n".format(driver_setup["DSN"]))
                for key, value in driver_setup.items():
                    if key != "DSN":
                        f.write(key + "=" + value + "\n")
415

416 417 418 419 420 421 422 423
    def create_dir(self, destroy_dir=True):
        """Create the instance directory and all the needed files there."""

        if destroy_dir:
            self.destroy_dir()
        elif p.exists(self.path):
            return

424
        os.makedirs(self.path)
425

426
        configs_dir = p.abspath(p.join(self.path, 'configs'))
427 428 429 430 431
        os.mkdir(configs_dir)

        shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
        shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)

432 433
        config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
        users_d_dir = p.abspath(p.join(configs_dir, 'users.d'))
434
        os.mkdir(config_d_dir)
A
Alexey Zatelepin 已提交
435
        os.mkdir(users_d_dir)
436 437 438

        shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)

439 440 441
        # Generate and write macros file
        macros = self.macros.copy()
        macros['instance'] = self.name
442
        with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
443
            macros_config.write(self.dict_to_xml({"macros" : macros}))
444

445
        # Put ZooKeeper config
446
        if self.with_zookeeper:
447
            shutil.copy(self.zookeeper_config_path, config_d_dir)
448

449 450 451 452 453 454
        # Copy config dir
        if self.custom_config_dir:
            distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)

        # Copy config.d configs
        for path in self.custom_main_config_paths:
455 456
            shutil.copy(path, config_d_dir)

457 458 459 460
        # Copy users.d configs
        for path in self.custom_user_config_paths:
            shutil.copy(path, users_d_dir)

461
        db_dir = p.abspath(p.join(self.path, 'database'))
462
        os.mkdir(db_dir)
463 464
        if self.clickhouse_path_dir is not None:
            distutils.dir_util.copy_tree(self.clickhouse_path_dir, db_dir)
465

466
        logs_dir = p.abspath(p.join(self.path, 'logs'))
467 468
        os.mkdir(logs_dir)

S
sundy-li 已提交
469 470 471 472 473
        depends_on = []

        if self.with_mysql:
            depends_on.append("mysql1")

474 475 476
        if self.with_kafka:
            depends_on.append("kafka1")

477
        if self.with_zookeeper:
S
sundy-li 已提交
478 479 480
            depends_on.append("zoo1")
            depends_on.append("zoo2")
            depends_on.append("zoo3")
481

482 483
        env_file = _create_env_file(os.path.dirname(self.docker_compose_path), self.env_variables)

A
alesapin 已提交
484 485 486 487 488
        odbc_ini_path = ""
        if self.odbc_ini_path:
            self._create_odbc_config_file()
            odbc_ini_path = '- ' + self.odbc_ini_path

489 490
        with open(self.docker_compose_path, 'w') as docker_compose:
            docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
A
alesapin 已提交
491
                image=self.image,
492
                name=self.name,
493
                hostname=self.hostname,
494 495 496 497
                uid=os.getuid(),
                binary_path=self.server_bin_path,
                configs_dir=configs_dir,
                config_d_dir=config_d_dir,
S
sundy-li 已提交
498
                db_dir=db_dir,
499
                logs_dir=logs_dir,
500
                depends_on=str(depends_on),
A
alesapin 已提交
501 502 503
                env_file=env_file,
                odbc_ini_path=odbc_ini_path,
            ))
504 505 506 507 508


    def destroy_dir(self):
        if p.exists(self.path):
            shutil.rmtree(self.path)