cluster.py 11.0 KB
Newer Older
1 2
import os
import os.path as p
3
import pwd
4 5 6
import re
import subprocess
import shutil
7
import distutils.dir_util
8 9 10
import socket
import time
import errno
11 12
from dicttoxml import dicttoxml
import xml.dom.minidom
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

import docker

from .client import Client


HELPERS_DIR = p.dirname(__file__)


class ClickHouseCluster:
    """ClickHouse cluster with several instances and (possibly) ZooKeeper.

    Add instances with several calls to add_instance(), then start them with the start() call.

    Directories for instances are created in the directory of base_path. After cluster is started,
    these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
    """

31
    def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None):
32
        self.base_dir = p.dirname(base_path)
33
        self.name = name if name is not None else ''
34 35 36 37 38

        self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
        self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
        self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')

39
        self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
40 41 42 43 44 45
        # docker-compose removes everything non-alphanumeric from project names so we do it too.
        self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())

        self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
        self.instances = {}
        self.with_zookeeper = False
46 47

        self.docker_client = None
48 49 50
        self.is_up = False


51
    def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False, clickhouse_path_dir=None):
52 53 54
        """Add an instance to the cluster.

        name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
55 56 57
        config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
        main_configs - a list of config files that will be added to config.d/ directory
        user_configs - a list of config files that will be added to users.d/ directory
58 59 60 61
        with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
        """

        if self.is_up:
62
            raise Exception("Can\'t add instance %s: cluster is already up!" % name)
63 64

        if name in self.instances:
65
            raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
66

67
        instance = ClickHouseInstance(self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir)
68 69 70 71 72 73 74 75 76
        self.instances[name] = instance
        self.base_cmd.extend(['--file', instance.docker_compose_path])
        if with_zookeeper and not self.with_zookeeper:
            self.with_zookeeper = True
            self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])

        return instance


77 78 79 80 81
    def get_instance_docker_id(self, instance_name):
        # According to how docker-compose names containers.
        return self.project_name + '_' + instance_name + '_1'


82 83 84 85 86 87 88 89 90
    def start(self, destroy_dirs=True):
        if self.is_up:
            return

        for instance in self.instances.values():
            instance.create_dir(destroy_dir=destroy_dirs)

        subprocess.check_call(self.base_cmd + ['up', '-d'])

91 92
        self.docker_client = docker.from_env()

93 94
        start_deadline = time.time() + 20.0 # seconds
        for instance in self.instances.itervalues():
95 96 97
            instance.docker_client = self.docker_client

            container = self.docker_client.containers.get(instance.docker_id)
98 99
            instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']

100
            instance.wait_for_start(start_deadline)
101 102 103 104 105 106 107 108 109 110 111 112

            instance.client = Client(instance.ip_address, command=self.client_bin_path)

        self.is_up = True


    def shutdown(self, kill=True):
        if kill:
            subprocess.check_call(self.base_cmd + ['kill'])
        subprocess.check_call(self.base_cmd + ['down', '--volumes'])
        self.is_up = False

113 114
        self.docker_client = None

115
        for instance in self.instances.values():
116
            instance.docker_client = None
117 118 119 120 121 122 123 124 125
            instance.ip_address = None
            instance.client = None


DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
    {name}:
        image: ubuntu:14.04
126
        hostname: {name}
127 128 129 130 131 132 133 134 135 136 137 138 139
        user: '{uid}'
        volumes:
            - {binary_path}:/usr/bin/clickhouse:ro
            - {configs_dir}:/etc/clickhouse-server/
            - {db_dir}:/var/lib/clickhouse/
            - {logs_dir}:/var/log/clickhouse-server/
        entrypoint:
            -  /usr/bin/clickhouse
            -  --config-file=/etc/clickhouse-server/config.xml
            -  --log-file=/var/log/clickhouse-server/clickhouse-server.log
        depends_on: {depends_on}
'''

140

141 142
class ClickHouseInstance:
    def __init__(
143
            self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
144
            with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir):
145 146

        self.name = name
147 148 149 150 151 152 153
        self.base_cmd = cluster.base_cmd[:]
        self.docker_id = cluster.get_instance_docker_id(self.name)
        self.cluster = cluster

        self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
        self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
        self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
154
        self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
155
        self.macroses = macroses if macroses is not None else {}
156 157 158 159 160
        self.with_zookeeper = with_zookeeper

        self.base_configs_dir = base_configs_dir
        self.server_bin_path = server_bin_path

161 162
        suffix = '_instances' + ('' if not self.cluster.name else '_' + self.cluster.name)
        self.path = p.abspath(p.join(base_path, suffix, name))
163 164
        self.docker_compose_path = p.join(self.path, 'docker_compose.yml')

165
        self.docker_client = None
166 167
        self.ip_address = None
        self.client = None
168 169 170 171 172 173 174 175 176 177
        self.default_timeout = 20.0 # 20 sec


    def query(self, *args, **kwargs):
        return self.client.query(*args, **kwargs)


    def get_query_request(self, *args, **kwargs):
        return self.client.get_query_request(*args, **kwargs)

178

179 180
    def get_docker_handle(self):
        return self.docker_client.containers.get(self.docker_id)
181 182


183 184 185 186 187 188 189 190 191 192 193 194 195
    def stop(self):
        self.get_docker_handle().stop(self.default_timeout)


    def start(self):
        self.get_docker_handle().start()


    def wait_for_start(self, deadline=None, timeout=None):
        start_time = time.time()

        if timeout is not None:
            deadline = start_time + timeout
196 197

        while True:
198
            status = self.get_docker_handle().status
199

200 201 202 203 204 205 206 207
            if status == 'exited':
                raise Exception("Instance `{}' failed to start. Container status: {}".format(self.name, status))

            current_time = time.time()
            time_left = deadline - current_time
            if deadline is not None and current_time >= deadline:
                raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
                                "Container status: {}".format(self.name, self.ip_address, status))
208 209 210 211 212

            # Repeatedly poll the instance address until there is something that listens there.
            # Usually it means that ClickHouse is ready to accept queries.
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
213
                sock.settimeout(time_left)
214 215
                sock.connect((self.ip_address, 9000))
                return
216 217
            except socket.timeout:
                continue
218 219 220 221 222 223 224 225 226
            except socket.error as e:
                if e.errno == errno.ECONNREFUSED:
                    time.sleep(0.1)
                else:
                    raise
            finally:
                sock.close()


227 228 229 230 231 232
    @staticmethod
    def dict_to_xml(dictionary):
        xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
        return xml.dom.minidom.parseString(xml_str).toprettyxml()


233 234 235 236 237 238 239 240
    def create_dir(self, destroy_dir=True):
        """Create the instance directory and all the needed files there."""

        if destroy_dir:
            self.destroy_dir()
        elif p.exists(self.path):
            return

241
        os.makedirs(self.path)
242 243 244 245 246 247 248 249

        configs_dir = p.join(self.path, 'configs')
        os.mkdir(configs_dir)

        shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
        shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)

        config_d_dir = p.join(configs_dir, 'config.d')
250
        users_d_dir = p.join(configs_dir, 'users.d')
251
        os.mkdir(config_d_dir)
A
Alexey Zatelepin 已提交
252
        os.mkdir(users_d_dir)
253 254 255

        shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)

256 257 258
        # Generate and write macroses file
        macroses = self.macroses.copy()
        macroses['instance'] = self.name
259
        with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
260
            macros_config.write(self.dict_to_xml({"macros" : macroses}))
261

262
        # Put ZooKeeper config
263 264 265
        if self.with_zookeeper:
            shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir)

266 267 268 269 270 271
        # Copy config dir
        if self.custom_config_dir:
            distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)

        # Copy config.d configs
        for path in self.custom_main_config_paths:
272 273
            shutil.copy(path, config_d_dir)

274 275 276 277
        # Copy users.d configs
        for path in self.custom_user_config_paths:
            shutil.copy(path, users_d_dir)

278 279
        db_dir = p.join(self.path, 'database')
        os.mkdir(db_dir)
280 281
        if self.clickhouse_path_dir is not None:
            distutils.dir_util.copy_tree(self.clickhouse_path_dir, db_dir)
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304

        logs_dir = p.join(self.path, 'logs')
        os.mkdir(logs_dir)

        depends_on = '[]'
        if self.with_zookeeper:
            depends_on = '["zoo1", "zoo2", "zoo3"]'

        with open(self.docker_compose_path, 'w') as docker_compose:
            docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
                name=self.name,
                uid=os.getuid(),
                binary_path=self.server_bin_path,
                configs_dir=configs_dir,
                config_d_dir=config_d_dir,
                db_dir=db_dir,
                logs_dir=logs_dir,
                depends_on=depends_on))


    def destroy_dir(self):
        if p.exists(self.path):
            shutil.rmtree(self.path)