提交 2ee08219 编写于 作者: V Vladimir Chebotarev

Added tests for reloading storage configuration.

上级 2bd64743
......@@ -880,19 +880,19 @@ class ClickHouseInstance:
# used by all utils with any config
conf_d_dir = p.abspath(p.join(configs_dir, 'conf.d'))
# used by server with main config.xml
config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
self.config_d_dir = p.abspath(p.join(configs_dir, 'config.d'))
users_d_dir = p.abspath(p.join(configs_dir, 'users.d'))
os.mkdir(conf_d_dir)
os.mkdir(config_d_dir)
os.mkdir(self.config_d_dir)
os.mkdir(users_d_dir)
# The file is named with 0_ prefix to be processed before other configuration overloads.
shutil.copy(p.join(HELPERS_DIR, '0_common_instance_config.xml'), config_d_dir)
shutil.copy(p.join(HELPERS_DIR, '0_common_instance_config.xml'), self.config_d_dir)
# Generate and write macros file
macros = self.macros.copy()
macros['instance'] = self.name
with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
with open(p.join(self.config_d_dir, 'macros.xml'), 'w') as macros_config:
macros_config.write(self.dict_to_xml({"macros": macros}))
# Put ZooKeeper config
......@@ -905,7 +905,7 @@ class ClickHouseInstance:
# Copy config.d configs
for path in self.custom_main_config_paths:
shutil.copy(path, config_d_dir)
shutil.copy(path, self.config_d_dir)
# Copy users.d configs
for path in self.custom_user_config_paths:
......@@ -976,7 +976,7 @@ class ClickHouseInstance:
binary_volume=binary_volume,
odbc_bridge_volume=odbc_bridge_volume,
configs_dir=configs_dir,
config_d_dir=config_d_dir,
config_d_dir=self.config_d_dir,
db_dir=db_dir,
tmpfs=str(self.tmpfs),
logs_dir=logs_dir,
......
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
\ No newline at end of file
<yandex>
<storage_configuration>
<disks>
<default>
<keep_free_space_bytes>1024</keep_free_space_bytes>
</default>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
<keep_free_space_bytes>10485760</keep_free_space_bytes>
<!-- 10MB -->
</jbod2>
<external>
<path>/external/</path>
</external>
</disks>
<policies>
<small_jbod_with_external>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</small_jbod_with_external>
<one_more_small_jbod_with_external>
<volumes>
<m>
<disk>jbod1</disk>
</m>
<e>
<disk>external</disk>
</e>
</volumes>
</one_more_small_jbod_with_external>
<!-- store on JBOD by default (round-robin), store big parts on external -->
<jbods_with_external>
<volumes>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
<max_data_part_size_bytes>10485760</max_data_part_size_bytes>
<!-- 10MB -->
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</jbods_with_external>
<!-- Moving all parts jbod1 if acquired more than 70% -->
<moving_jbod_with_external>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
<move_factor>0.7</move_factor>
</moving_jbod_with_external>
<!-- store local by default, store big parts on external -->
<default_disk_with_external>
<volumes>
<small>
<disk>default</disk>
<max_data_part_size_bytes>2097152</max_data_part_size_bytes>
<!-- 2MB -->
</small>
<big>
<disk>external</disk>
<max_data_part_size_bytes>20971520</max_data_part_size_bytes>
<!-- 20MB -->
</big>
</volumes>
</default_disk_with_external>
<!-- special policy for checking validation of `max_data_part_size` -->
<special_warning_policy>
<volumes>
<special_warning_zero_volume>
<disk>default</disk>
<max_data_part_size_bytes>0</max_data_part_size_bytes>
</special_warning_zero_volume>
<special_warning_default_volume>
<disk>external</disk>
</special_warning_default_volume>
<special_warning_small_volume>
<disk>jbod1</disk>
<max_data_part_size_bytes>1024</max_data_part_size_bytes>
</special_warning_small_volume>
<special_warning_big_volume>
<disk>jbod2</disk>
<max_data_part_size_bytes>1024000000</max_data_part_size_bytes>
</special_warning_big_volume>
</volumes>
</special_warning_policy>
</policies>
</storage_configuration>
</yandex>
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>500</flush_interval_milliseconds>
</part_log>
</yandex>
import collections
import os
import re
import shutil
import time
import xml.etree.ElementTree as ET
import pytest
import helpers.client
import helpers.cluster
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
stay_alive=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/jbod3:size=40M', '/jbod4:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 1} )
node2 = cluster.add_instance('node2',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
stay_alive=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/jbod3:size=40M', '/jbod4:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 2} )
def get_log(node):
return node.exec_in_container(["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start_over():
shutil.copy(os.path.join(os.path.dirname(__file__), "configs/config.d/storage_configuration.xml"), os.path.join(node1.config_d_dir, "storage_configuration.xml"))
for node in (node1, node2):
separate_configuration_path = os.path.join(node.config_d_dir, "separate_configuration.xml")
try:
os.remove(separate_configuration_path)
except:
""""""
def add_disk(node, name, path, separate_file=False):
separate_configuration_path = os.path.join(node.config_d_dir, "separate_configuration.xml")
try:
if separate_file:
tree = ET.parse(separate_configuration_path)
else:
tree = ET.parse(os.path.join(node.config_d_dir, "storage_configuration.xml"))
except:
tree = ET.ElementTree(ET.fromstring('<yandex><storage_configuration><disks/><policies/></storage_configuration></yandex>'))
root = tree.getroot()
new_disk = ET.Element(name)
new_path = ET.Element("path")
new_path.text = path
new_disk.append(new_path)
root.find("storage_configuration").find("disks").append(new_disk)
if separate_file:
tree.write(separate_configuration_path)
else:
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def add_policy(node, name, volumes):
tree = ET.parse(os.path.join(node.config_d_dir, "storage_configuration.xml"))
root = tree.getroot()
new_policy = ET.Element(name)
new_volumes = ET.Element("volumes")
for volume, disks in volumes.items():
new_volume = ET.Element(volume)
for disk in disks:
new_disk = ET.Element("disk")
new_disk.text = disk
new_volume.append(new_disk)
new_volumes.append(new_volume)
new_policy.append(new_volumes)
root.find("storage_configuration").find("policies").append(new_policy)
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def test_add_disk(started_cluster):
try:
name = "test_add_disk"
engine = "MergeTree()"
start_over()
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "jbod3" not in set(node1.query("SELECT name FROM system.disks").splitlines())
add_disk(node1, "jbod3", "/jbod3/")
node1.query("SYSTEM RELOAD CONFIG")
assert "jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_disk_to_separate_config(started_cluster):
try:
name = "test_add_disk"
engine = "MergeTree()"
start_over()
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "jbod3" not in set(node1.query("SELECT name FROM system.disks").splitlines())
add_disk(node1, "jbod3", "/jbod3/", separate_file=True)
node1.query("SYSTEM RELOAD CONFIG")
assert "jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
start_over()
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_policy(started_cluster):
try:
name = "test_add_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
add_policy(node1, "cool_policy", {"volume1": ["jbod3", "jbod4"]})
node1.query("SYSTEM RELOAD CONFIG")
disks = set(node1.query("SELECT name FROM system.disks").splitlines())
assert "cool_policy" in set(node1.query("SELECT policy_name FROM system.storage_policies").splitlines())
assert {"volume1"} == set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
assert {"['jbod3','jbod4']"} == set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_new_policy_works(started_cluster):
try:
name = "test_new_policy_works"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.query("SYSTEM RELOAD CONFIG")
# Incompatible storage policy.
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("""
ALTER TABLE {name} MODIFY SETTING storage_policy='cool_policy'
""".format(name=name))
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", collections.OrderedDict([("volume1", ["jbod3"]), ("main", ["jbod1", "jbod2"]), ("external", ["external"])]))
node1.query("SYSTEM RELOAD CONFIG")
node1.query("""
ALTER TABLE {name} MODIFY SETTING storage_policy='cool_policy'
""".format(name=name))
node1.query("""
INSERT INTO TABLE {name} VALUES (1)
""".format(name=name))
assert {"jbod3"} == set(node1.query("SELECT disk_name FROM system.parts WHERE active = 1 AND table = '{name}'".format(name=name)).splitlines())
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_volume_to_policy(started_cluster):
try:
name = "test_add_volume_to_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", collections.OrderedDict([("volume1", ["jbod3"]), ("volume2", ["jbod4"])]))
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
disks_sets = set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
assert {"volume1", "volume2"} == volumes
assert {"['jbod3']", "['jbod4']"} == disks_sets
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_disk_to_policy(started_cluster):
try:
name = "test_add_disk_to_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3","jbod4"]})
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
disks_sets = set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
assert {"volume1"} == volumes
assert {"['jbod3','jbod4']"} == disks_sets
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_disk(started_cluster):
try:
name = "test_remove_disk"
engine = "MergeTree()"
start_over()
add_disk(node1, "remove_disk_jbod3", "/jbod3/")
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "remove_disk_jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
start_over()
node1.query("SYSTEM RELOAD CONFIG")
assert "remove_disk_jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
assert re.search("Warning.*remove_disk_jbod3", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_policy(started_cluster):
try:
name = "test_remove_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "remove_policy_cool_policy", {"volume1": ["jbod3", "jbod4"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "remove_policy_cool_policy" in set(node1.query("SELECT policy_name FROM system.storage_policies").splitlines())
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
node1.query("SYSTEM RELOAD CONFIG")
assert "remove_policy_cool_policy" in set(node1.query("SELECT policy_name FROM system.storage_policies").splitlines())
assert re.search("Error.*remove_policy_cool_policy", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_volume_from_policy(started_cluster):
try:
name = "test_remove_volume_from_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "test_remove_volume_from_policy_cool_policy", collections.OrderedDict([("volume1", ["jbod3"]), ("volume2", ["jbod4"])]))
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
volumes = set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
assert {"volume1", "volume2"} == volumes
assert {"['jbod3']", "['jbod4']"} == disks_sets
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
assert {"volume1", "volume2"} == volumes
assert {"['jbod3']", "['jbod4']"} == disks_sets
assert re.search("Error.*test_remove_volume_from_policy_cool_policy", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_disk_from_policy(started_cluster):
try:
name = "test_remove_disk_from_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "test_remove_disk_from_policy_cool_policy", {"volume1": ["jbod3","jbod4"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
volumes = set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
assert {"volume1"} == volumes
assert {"['jbod3','jbod4']"} == disks_sets
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query("SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
assert {"volume1"} == volumes
assert {"['jbod3','jbod4']"} == disks_sets
assert re.search("Error.*test_remove_disk_from_policy_cool_policy", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册