test.py 12.9 KB
Newer Older
1 2 3 4 5 6
import os.path as p
import time
import datetime
import pytest

from helpers.cluster import ClickHouseCluster
7
from helpers.network import PartitionManager, PartitionManagerDisbaler
8 9 10 11 12 13 14 15
from helpers.test_tools import TSV


def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
    if num_hosts is None:
        num_hosts = len(cluster.instances)

    M = TSV.toMat(tsv_content)
16 17 18
    hosts = [(l[0], l[1]) for l in M] # (host, port)
    codes = [l[2] for l in M]
    messages = [l[3] for l in M]
19

20 21 22
    assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
    assert len(set(codes)) == 1, "\n" + tsv_content
    assert codes[0] == "0", "\n" + tsv_content
23 24 25 26 27 28 29


def ddl_check_query(instance, query, num_hosts=None):
    contents = instance.query(query)
    check_all_hosts_sucesfully_executed(contents, num_hosts)
    return contents

30
def ddl_check_there_are_no_dublicates(instance):
31 32
    rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/*ddl_entry=query-%' GROUP BY query)")
    assert len(rows) == 0 or rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
33

34 35 36 37 38 39 40 41 42 43 44 45 46 47
# Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
def insert_reliable(instance, query_insert):
    for i in xrange(100):
        try:
            instance.query(query_insert)
            return
        except Exception as e:
            last_exception = e
            s = str(e)
            if not (s.find('Unknown status, client must retry') >= 0 or s.find('zkutil::KeeperException')):
                raise e

    raise last_exception

48 49

TEST_REPLICATED_ALTERS=True
50 51
cluster = ClickHouseCluster(__file__)

52

53 54 55 56 57 58 59 60 61 62 63 64 65
def replace_domains_to_ip_addresses_in_cluster_config(instances_to_replace):
    clusters_config = open(p.join(cluster.base_dir, 'configs/config.d/clusters.xml')).read()

    for inst_name, inst in cluster.instances.items():
        clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))

    for inst_name in instances_to_replace:
        inst = cluster.instances[inst_name]
        cluster.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
        # print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])


def init_cluster(cluster):
66
    try:
67 68 69 70 71 72 73
        for i in xrange(4):
            cluster.add_instance(
                'ch{}'.format(i+1),
                config_dir="configs",
                macroses={"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1},
                with_zookeeper=True)

74 75
        cluster.start()

76 77 78
        # Replace config files for testing ability to set host in DNS and IP formats
        replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])

79
        # Select sacrifice instance to test CONNECTION_LOSS and server fail on it
80
        sacrifice = cluster.instances['ch4']
81
        cluster.pm_random_drops = PartitionManager()
82 83
        cluster.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
        cluster.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
84

85 86 87
        # Initialize databases and service tables
        instance = cluster.instances['ch1']

88
        instance.query("SELECT 1")
89 90 91 92 93 94 95 96
        ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
    (database String, name String, engine String, metadata_modification_time DateTime)
    ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')
        """)

        ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")

97 98 99 100 101 102 103 104 105 106
    except Exception as e:
        print e
        raise


@pytest.fixture(scope="module")
def started_cluster():
    try:
        init_cluster(cluster)

107 108
        yield cluster

109
        instance = cluster.instances['ch1']
110 111 112
        ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
        ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")

113
        # Check query log to ensure that DDL queries are not executed twice
114
        time.sleep(1.5)
115 116
        for instance in cluster.instances.values():
            ddl_check_there_are_no_dublicates(instance)
117

118 119 120 121
    finally:
        # Remove iptables rules for sacrifice instance
        cluster.pm_random_drops.heal_all()

122
        #cluster.shutdown()
123 124 125 126 127


def test_default_database(started_cluster):
    instance = cluster.instances['ch3']

128 129
    ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV")
130 131 132 133 134 135 136 137 138
    ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")

    contents = instance.query("SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h")
    assert TSV(contents) == TSV("ch1\tdefault\nch2\ttest2\nch3\tdefault\nch4\ttest2\n")

    ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER cluster2")
    ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")


139 140
def test_create_view(started_cluster):
    instance = cluster.instances['ch3']
141 142 143 144 145 146 147 148
    ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
    ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")

    ddl_check_query(instance, "CREATE TABLE test.super_simple (i Int8) ON CLUSTER 'cluster'")
    ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
    ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
149 150


151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
def test_on_server_fail(started_cluster):
    instance = cluster.instances['ch1']
    kill_instance = cluster.instances['ch2']

    ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")

    kill_instance.get_docker_handle().stop()
    request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null", timeout=30)
    kill_instance.get_docker_handle().start()

    ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")

    # Check query itself
    check_all_hosts_sucesfully_executed(request.get_answer())

    # And check query artefacts
    contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
    assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")

170
    ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203


def _test_on_connection_losses(cluster, zk_timeout):
    instance = cluster.instances['ch1']
    kill_instance = cluster.instances['ch2']

    with PartitionManager() as pm:
        pm.drop_instance_zk_connections(kill_instance)
        request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=10)
        time.sleep(zk_timeout)
        pm.restore_instance_zk_connections(kill_instance)

    check_all_hosts_sucesfully_executed(request.get_answer())


def test_on_connection_loss(started_cluster):
    _test_on_connection_losses(cluster, 1.5) # connection loss will occur only (3 sec ZK timeout in config)


def test_on_session_expired(started_cluster):
    _test_on_connection_losses(cluster, 4) # session should be expired (3 sec ZK timeout in config)


def test_replicated_alters(started_cluster):
    instance = cluster.instances['ch2']

    ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")

    if not TEST_REPLICATED_ALTERS:
        return

204 205 206
    # Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
    firewall_drops_rules = cluster.pm_random_drops.pop_rules()

207 208 209 210
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
211

212 213 214 215 216 217 218 219 220 221 222
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge, i)
""")
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
ENGINE = Distributed(cluster, default, merge, i)
""")

    for i in xrange(4):
        k = (i / 2) * 2
223
        insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
224 225 226 227 228 229 230 231 232 233 234 235

    assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster MODIFY COLUMN i Int64")
    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))


    for i in xrange(4):
        k = (i / 2) * 2 + 4
236
        insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
237 238 239 240 241 242 243 244

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster DETACH PARTITION 197002")
    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))

    ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster")
245 246 247 248

    # Enable random ZK packet drops
    cluster.pm_random_drops.push_rules(firewall_drops_rules)

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
    ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
    ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")


def test_simple_alters(started_cluster):
    instance = cluster.instances['ch2']

    ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster_without_replication")

    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""")
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")
    ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster_without_replication (p Date, i Int64, s String)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")

    for i in xrange(4):
        k = (i / 2) * 2
        cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))

    assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
281
    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))


    for i in xrange(4):
        k = (i / 2) * 2 + 4
        cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))

    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))


    ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication DETACH PARTITION 197002")
    assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))

    ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster_without_replication")
    ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster_without_replication")