test.py 13.2 KB
Newer Older
1
import os
2
import os.path as p
3
import sys
4 5 6
import time
import datetime
import pytest
7
from contextlib import contextmanager
8

9
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
from helpers.cluster import ClickHouseCluster
11
from helpers.network import PartitionManager, PartitionManagerDisbaler
12 13 14 15 16 17 18 19
from helpers.test_tools import TSV


def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
    if num_hosts is None:
        num_hosts = len(cluster.instances)

    M = TSV.toMat(tsv_content)
20 21 22
    hosts = [(l[0], l[1]) for l in M] # (host, port)
    codes = [l[2] for l in M]
    messages = [l[3] for l in M]
23

24 25 26
    assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
    assert len(set(codes)) == 1, "\n" + tsv_content
    assert codes[0] == "0", "\n" + tsv_content
27 28 29 30 31 32 33


def ddl_check_query(instance, query, num_hosts=None):
    contents = instance.query(query)
    check_all_hosts_sucesfully_executed(contents, num_hosts)
    return contents

34
def ddl_check_there_are_no_dublicates(instance):
35 36
    rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)")
    assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
37

38 39 40 41 42 43 44 45 46 47 48 49 50 51
# Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
def insert_reliable(instance, query_insert):
    for i in xrange(100):
        try:
            instance.query(query_insert)
            return
        except Exception as e:
            last_exception = e
            s = str(e)
            if not (s.find('Unknown status, client must retry') >= 0 or s.find('zkutil::KeeperException')):
                raise e

    raise last_exception

52 53

TEST_REPLICATED_ALTERS=True
54 55
cluster = ClickHouseCluster(__file__)

56

57 58 59 60 61 62 63 64 65 66 67 68 69
def replace_domains_to_ip_addresses_in_cluster_config(instances_to_replace):
    clusters_config = open(p.join(cluster.base_dir, 'configs/config.d/clusters.xml')).read()

    for inst_name, inst in cluster.instances.items():
        clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))

    for inst_name in instances_to_replace:
        inst = cluster.instances[inst_name]
        cluster.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
        # print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])


def init_cluster(cluster):
70
    try:
71 72 73 74 75 76 77
        for i in xrange(4):
            cluster.add_instance(
                'ch{}'.format(i+1),
                config_dir="configs",
                macroses={"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1},
                with_zookeeper=True)

78 79
        cluster.start()

80 81 82
        # Replace config files for testing ability to set host in DNS and IP formats
        replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])

83
        # Select sacrifice instance to test CONNECTION_LOSS and server fail on it
84
        sacrifice = cluster.instances['ch4']
85
        cluster.pm_random_drops = PartitionManager()
86 87
        cluster.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
        cluster.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
88

89 90 91
        # Initialize databases and service tables
        instance = cluster.instances['ch1']

92
        instance.query("SELECT 1")
93 94 95 96 97 98 99 100
        ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
    (database String, name String, engine String, metadata_modification_time DateTime)
    ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')
        """)

        ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")

101 102 103 104 105 106 107 108 109 110
    except Exception as e:
        print e
        raise


@pytest.fixture(scope="module")
def started_cluster():
    try:
        init_cluster(cluster)

111 112
        yield cluster

113
        instance = cluster.instances['ch1']
114 115 116
        ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
        ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")

117
        # Check query log to ensure that DDL queries are not executed twice
118
        time.sleep(1.5)
119 120
        for instance in cluster.instances.values():
            ddl_check_there_are_no_dublicates(instance)
121

122
    finally:
123
        cluster.shutdown()
124 125 126 127 128


def test_default_database(started_cluster):
    instance = cluster.instances['ch3']

129 130
    ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV")
131 132 133 134 135 136 137 138 139
    ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")

    contents = instance.query("SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h")
    assert TSV(contents) == TSV("ch1\tdefault\nch2\ttest2\nch3\tdefault\nch4\ttest2\n")

    ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER cluster2")
    ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")


140 141
def test_create_view(started_cluster):
    instance = cluster.instances['ch3']
142 143 144
    ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
    ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
145
    ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
146

147
    ddl_check_query(instance, "CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory")
148 149
    ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
150 151


152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
def test_on_server_fail(started_cluster):
    instance = cluster.instances['ch1']
    kill_instance = cluster.instances['ch2']

    ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")

    kill_instance.get_docker_handle().stop()
    request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null", timeout=30)
    kill_instance.get_docker_handle().start()

    ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")

    # Check query itself
    check_all_hosts_sucesfully_executed(request.get_answer())

    # And check query artefacts
    contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
    assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")

171
    ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204


def _test_on_connection_losses(cluster, zk_timeout):
    instance = cluster.instances['ch1']
    kill_instance = cluster.instances['ch2']

    with PartitionManager() as pm:
        pm.drop_instance_zk_connections(kill_instance)
        request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=10)
        time.sleep(zk_timeout)
        pm.restore_instance_zk_connections(kill_instance)

    check_all_hosts_sucesfully_executed(request.get_answer())


def test_on_connection_loss(started_cluster):
    _test_on_connection_losses(cluster, 1.5) # connection loss will occur only (3 sec ZK timeout in config)


def test_on_session_expired(started_cluster):
    _test_on_connection_losses(cluster, 4) # session should be expired (3 sec ZK timeout in config)


def test_replicated_alters(started_cluster):
    instance = cluster.instances['ch2']

    ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")

    if not TEST_REPLICATED_ALTERS:
        return

205 206 207
    # Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
    firewall_drops_rules = cluster.pm_random_drops.pop_rules()

208 209 210 211
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
212

213 214 215 216 217 218 219 220 221 222 223
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge, i)
""")
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
ENGINE = Distributed(cluster, default, merge, i)
""")

    for i in xrange(4):
        k = (i / 2) * 2
224
        insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
225 226 227 228 229 230 231 232 233 234 235 236

    assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster MODIFY COLUMN i Int64")
    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))


    for i in xrange(4):
        k = (i / 2) * 2 + 4
237
        insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
238 239 240 241 242 243 244 245

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster DETACH PARTITION 197002")
    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))

    ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster")
246 247 248 249

    # Enable random ZK packet drops
    cluster.pm_random_drops.push_rules(firewall_drops_rules)

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
    ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")


def test_simple_alters(started_cluster):
    instance = cluster.instances['ch2']

    ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster_without_replication")

    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""")
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster_without_replication (p Date, i Int64, s String)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")

    for i in xrange(4):
        k = (i / 2) * 2
        cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))

    assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
282
    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))


    for i in xrange(4):
        k = (i / 2) * 2 + 4
        cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication DETACH PARTITION 197002")
    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))

    ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster_without_replication")
300 301 302 303 304 305 306


if __name__ == '__main__':
    with contextmanager(started_cluster)() as cluster:
       for name, instance in cluster.instances.items():
           print name, instance.ip_address
       raw_input("Cluster created, press any key to destroy...")