“38693ef37c39b7327cbc06eb80782ee31e968c8d”上不存在“modules/core/git@gitcode.net:greenplum/opencv.git”
cluster.py 14.0 KB
Newer Older
1 2
import os
import os.path as p
3
import pwd
4 5 6
import re
import subprocess
import shutil
7
import distutils.dir_util
8 9 10
import socket
import time
import errno
11 12
from dicttoxml import dicttoxml
import xml.dom.minidom
13 14

import docker
15
from docker.errors import ContainerError
16

17
from .client import Client, CommandRequest
18 19 20 21 22 23 24 25 26 27 28 29 30 31


HELPERS_DIR = p.dirname(__file__)


class ClickHouseCluster:
    """ClickHouse cluster with several instances and (possibly) ZooKeeper.

    Add instances with several calls to add_instance(), then start them with the start() call.

    Directories for instances are created in the directory of base_path. After cluster is started,
    these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
    """

32 33
    def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
                 zookeeper_config_path=None):
34
        self.base_dir = p.dirname(base_path)
35
        self.name = name if name is not None else ''
36 37 38 39

        self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
        self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
        self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
40
        self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(HELPERS_DIR, 'zookeeper_config.xml')
41

42
        self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
43 44
        # docker-compose removes everything non-alphanumeric from project names so we do it too.
        self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
45
        self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
46 47

        self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
48 49
        self.base_zookeeper_cmd = None
        self.pre_zookkeeper_commands = []
50 51
        self.instances = {}
        self.with_zookeeper = False
52 53

        self.docker_client = None
54 55 56
        self.is_up = False


57 58
    def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False,
        clickhouse_path_dir=None, hostname=None):
59 60 61
        """Add an instance to the cluster.

        name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
62 63 64
        config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
        main_configs - a list of config files that will be added to config.d/ directory
        user_configs - a list of config files that will be added to users.d/ directory
65 66 67 68
        with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
        """

        if self.is_up:
69
            raise Exception("Can\'t add instance %s: cluster is already up!" % name)
70 71

        if name in self.instances:
72
            raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
73

74 75
        instance = ClickHouseInstance(
            self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper,
76
            self.zookeeper_config_path, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
77

78 79 80 81 82
        self.instances[name] = instance
        self.base_cmd.extend(['--file', instance.docker_compose_path])
        if with_zookeeper and not self.with_zookeeper:
            self.with_zookeeper = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
83 84
            self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
                                       self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
85 86 87 88

        return instance


89 90 91 92 93
    def get_instance_docker_id(self, instance_name):
        # According to how docker-compose names containers.
        return self.project_name + '_' + instance_name + '_1'


94 95 96 97
    def start(self, destroy_dirs=True):
        if self.is_up:
            return

98
        # Just in case kill unstopped containers from previous launch
99
        try:
100 101
            if not subprocess.call(['docker-compose', 'kill']):
                subprocess.call(['docker-compose', 'down', '--volumes'])
102 103 104
        except:
            pass

105 106
        if destroy_dirs and p.exists(self.instances_dir):
            print "Removing instances dir", self.instances_dir
107 108
            shutil.rmtree(self.instances_dir)

109 110 111
        for instance in self.instances.values():
            instance.create_dir(destroy_dir=destroy_dirs)

112 113
        self.docker_client = docker.from_env()

114 115 116 117 118 119 120
        if self.with_zookeeper and self.base_zookeeper_cmd:
            subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
            for command in self.pre_zookkeeper_commands:
                self.run_zookeeper_client_command(command, repeats=5)

        subprocess.check_call(self.base_cmd + ['up', '-d', '--no-recreate'])

121 122
        start_deadline = time.time() + 20.0 # seconds
        for instance in self.instances.itervalues():
123 124 125
            instance.docker_client = self.docker_client

            container = self.docker_client.containers.get(instance.docker_id)
126 127
            instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']

128
            instance.wait_for_start(start_deadline)
129 130 131 132 133 134 135 136 137

            instance.client = Client(instance.ip_address, command=self.client_bin_path)

        self.is_up = True


    def shutdown(self, kill=True):
        if kill:
            subprocess.check_call(self.base_cmd + ['kill'])
138
        subprocess.check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
139 140
        self.is_up = False

141 142
        self.docker_client = None

143
        for instance in self.instances.values():
144
            instance.docker_client = None
145 146 147 148
            instance.ip_address = None
            instance.client = None


149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    def run_zookeeper_client_command(self, command, zoo_node = 'zoo1', repeats=1, sleep_for=1):
        cli_cmd = 'zkCli.sh  ' + command
        zoo_name = self.get_instance_docker_id(zoo_node)
        network_mode = 'container:' + zoo_name
        for i in range(0, repeats - 1):
            try:
                return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
            except ContainerError:
                time.sleep(sleep_for)

        return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)

    def add_zookeeper_startup_command(self, command):
        self.pre_zookkeeper_commands.append(command)


165 166 167 168 169
DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
    {name}:
        image: ubuntu:14.04
170
        hostname: {hostname}
171 172 173 174 175 176 177 178 179 180 181 182 183
        user: '{uid}'
        volumes:
            - {binary_path}:/usr/bin/clickhouse:ro
            - {configs_dir}:/etc/clickhouse-server/
            - {db_dir}:/var/lib/clickhouse/
            - {logs_dir}:/var/log/clickhouse-server/
        entrypoint:
            -  /usr/bin/clickhouse
            -  --config-file=/etc/clickhouse-server/config.xml
            -  --log-file=/var/log/clickhouse-server/clickhouse-server.log
        depends_on: {depends_on}
'''

184

185 186
class ClickHouseInstance:
    def __init__(
187
            self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
188
            with_zookeeper, zookeeper_config_path, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
189 190

        self.name = name
191 192 193
        self.base_cmd = cluster.base_cmd[:]
        self.docker_id = cluster.get_instance_docker_id(self.name)
        self.cluster = cluster
194
        self.hostname = hostname if hostname is not None else self.name
195 196 197 198

        self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
        self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
        self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
199
        self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
200
        self.macroses = macroses if macroses is not None else {}
201
        self.with_zookeeper = with_zookeeper
202
        self.zookeeper_config_path = zookeeper_config_path
203 204 205 206

        self.base_configs_dir = base_configs_dir
        self.server_bin_path = server_bin_path

207
        self.path = p.join(self.cluster.instances_dir, name)
208 209
        self.docker_compose_path = p.join(self.path, 'docker_compose.yml')

210
        self.docker_client = None
211 212
        self.ip_address = None
        self.client = None
213 214
        self.default_timeout = 20.0 # 20 sec

A
Alexey Milovidov 已提交
215
    # Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
216 217 218
    def query(self, *args, **kwargs):
        return self.client.query(*args, **kwargs)

219
    # As query() but doesn't wait response and returns response handler
220 221 222
    def get_query_request(self, *args, **kwargs):
        return self.client.get_query_request(*args, **kwargs)

223

224 225 226 227 228 229 230 231 232 233
    def exec_in_container(self, cmd, **kwargs):
        container = self.get_docker_handle()
        handle = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
        output = self.docker_client.api.exec_start(handle).decode('utf8')
        exit_code = self.docker_client.api.exec_inspect(handle)['ExitCode']
        if exit_code:
            raise Exception('Cmd {} failed! Return code {}. Output {}'.format(' '.join(cmd), exit_code, output))
        return output


234 235
    def get_docker_handle(self):
        return self.docker_client.containers.get(self.docker_id)
236 237


238 239 240 241 242 243 244 245 246 247 248 249 250
    def stop(self):
        self.get_docker_handle().stop(self.default_timeout)


    def start(self):
        self.get_docker_handle().start()


    def wait_for_start(self, deadline=None, timeout=None):
        start_time = time.time()

        if timeout is not None:
            deadline = start_time + timeout
251 252

        while True:
253
            status = self.get_docker_handle().status
254

255 256 257 258 259 260 261 262
            if status == 'exited':
                raise Exception("Instance `{}' failed to start. Container status: {}".format(self.name, status))

            current_time = time.time()
            time_left = deadline - current_time
            if deadline is not None and current_time >= deadline:
                raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
                                "Container status: {}".format(self.name, self.ip_address, status))
263 264 265 266 267

            # Repeatedly poll the instance address until there is something that listens there.
            # Usually it means that ClickHouse is ready to accept queries.
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
268
                sock.settimeout(time_left)
269 270
                sock.connect((self.ip_address, 9000))
                return
271 272
            except socket.timeout:
                continue
273 274 275 276 277 278 279 280 281
            except socket.error as e:
                if e.errno == errno.ECONNREFUSED:
                    time.sleep(0.1)
                else:
                    raise
            finally:
                sock.close()


282 283 284 285 286 287
    @staticmethod
    def dict_to_xml(dictionary):
        xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
        return xml.dom.minidom.parseString(xml_str).toprettyxml()


288 289 290 291 292 293 294 295
    def create_dir(self, destroy_dir=True):
        """Create the instance directory and all the needed files there."""

        if destroy_dir:
            self.destroy_dir()
        elif p.exists(self.path):
            return

296
        os.makedirs(self.path)
297

298
        configs_dir = p.abspath(p.join(self.path, 'configs'))
299 300 301 302 303
        os.mkdir(configs_dir)

        shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
        shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)

304 305
        config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
        users_d_dir = p.abspath(p.join(configs_dir, 'users.d'))
306
        os.mkdir(config_d_dir)
A
Alexey Zatelepin 已提交
307
        os.mkdir(users_d_dir)
308 309 310

        shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)

311 312 313
        # Generate and write macroses file
        macroses = self.macroses.copy()
        macroses['instance'] = self.name
314
        with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
315
            macros_config.write(self.dict_to_xml({"macros" : macroses}))
316

317
        # Put ZooKeeper config
318
        if self.with_zookeeper:
319
            shutil.copy(self.zookeeper_config_path, config_d_dir)
320

321 322 323 324 325 326
        # Copy config dir
        if self.custom_config_dir:
            distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)

        # Copy config.d configs
        for path in self.custom_main_config_paths:
327 328
            shutil.copy(path, config_d_dir)

329 330 331 332
        # Copy users.d configs
        for path in self.custom_user_config_paths:
            shutil.copy(path, users_d_dir)

333
        db_dir = p.abspath(p.join(self.path, 'database'))
334
        os.mkdir(db_dir)
335 336
        if self.clickhouse_path_dir is not None:
            distutils.dir_util.copy_tree(self.clickhouse_path_dir, db_dir)
337

338
        logs_dir = p.abspath(p.join(self.path, 'logs'))
339 340 341 342 343 344 345 346 347
        os.mkdir(logs_dir)

        depends_on = '[]'
        if self.with_zookeeper:
            depends_on = '["zoo1", "zoo2", "zoo3"]'

        with open(self.docker_compose_path, 'w') as docker_compose:
            docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
                name=self.name,
348
                hostname=self.hostname,
349 350 351 352 353 354 355 356 357 358 359 360
                uid=os.getuid(),
                binary_path=self.server_bin_path,
                configs_dir=configs_dir,
                config_d_dir=config_d_dir,
                db_dir=db_dir,
                logs_dir=logs_dir,
                depends_on=depends_on))


    def destroy_dir(self):
        if p.exists(self.path):
            shutil.rmtree(self.path)